diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go index c6461af8c4..89c8824ab6 100644 --- a/pkg/ingress/apisix_route.go +++ b/pkg/ingress/apisix_route.go @@ -326,25 +326,7 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error } else if ev.Type == types.EventAdd { added = m } else { - var oldCtx *translation.TranslateContext - switch obj.GroupVersion { - case config.ApisixV2beta2: - oldCtx, err = c.controller.translator.TranslateRouteV2beta2(obj.OldObject.V2beta2()) - case config.ApisixV2beta3: - oldCtx, err = c.controller.translator.TranslateRouteV2beta3(obj.OldObject.V2beta3()) - case config.ApisixV2: - oldCtx, err = c.controller.translator.TranslateRouteV2(obj.OldObject.V2()) - } - if err != nil { - log.Errorw("failed to translate old ApisixRoute", - zap.String("version", obj.GroupVersion), - zap.String("event", "update"), - zap.Error(err), - zap.Any("ApisixRoute", ar), - ) - return err - } - + oldCtx, _ := c.getOldTranslateContext(ctx, obj.OldObject) om := &utils.Manifest{ Routes: oldCtx.Routes, Upstreams: oldCtx.Upstreams, @@ -693,3 +675,81 @@ func (c *apisixRouteController) handleSvcErr(key string, errOrigin error) { ) c.workqueue.AddRateLimited(key) } + +// Building objects from cache +// For old objects, you cannot use TranslateRoute to build. Because it needs to parse the latest service, which will cause data inconsistency +func (c *apisixRouteController) getOldTranslateContext(ctx context.Context, kar kube.ApisixRoute) (*translation.TranslateContext, error) { + clusterName := c.controller.cfg.APISIX.DefaultClusterName + oldCtx := translation.DefaultEmptyTranslateContext() + + switch c.controller.cfg.Kubernetes.ApisixRouteVersion { + case config.ApisixV2beta3: + ar := kar.V2beta3() + for _, part := range ar.Spec.Stream { + name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) + sr, err := c.controller.apisix.Cluster(clusterName).StreamRoute().Get(ctx, name) + if err != nil { + continue + } + if sr.UpstreamId != "" { + ups := apisixv1.NewDefaultUpstream() + ups.ID = sr.UpstreamId + oldCtx.AddUpstream(ups) + } + oldCtx.AddStreamRoute(sr) + } + for _, part := range ar.Spec.HTTP { + name := apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) + r, err := c.controller.apisix.Cluster(clusterName).Route().Get(ctx, name) + if err != nil { + continue + } + if r.UpstreamId != "" { + ups := apisixv1.NewDefaultUpstream() + ups.ID = r.UpstreamId + oldCtx.AddUpstream(ups) + } + if r.PluginConfigId != "" { + pc := apisixv1.NewDefaultPluginConfig() + pc.ID = r.PluginConfigId + oldCtx.AddPluginConfig(pc) + } + oldCtx.AddRoute(r) + } + case config.ApisixV2: + ar := kar.V2() + for _, part := range ar.Spec.Stream { + name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) + sr, err := c.controller.apisix.Cluster(clusterName).StreamRoute().Get(ctx, name) + if err != nil { + continue + } + if sr.UpstreamId != "" { + ups := apisixv1.NewDefaultUpstream() + ups.ID = sr.UpstreamId + oldCtx.AddUpstream(ups) + } + oldCtx.AddStreamRoute(sr) + } + for _, part := range ar.Spec.HTTP { + name := apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) + r, err := c.controller.apisix.Cluster(clusterName).Route().Get(ctx, name) + if err != nil { + continue + } + if r.UpstreamId != "" { + ups := apisixv1.NewDefaultUpstream() + ups.ID = r.UpstreamId + oldCtx.AddUpstream(ups) + } + if r.PluginConfigId != "" { + pc := apisixv1.NewDefaultPluginConfig() + pc.ID = r.PluginConfigId + oldCtx.AddPluginConfig(pc) + } + oldCtx.AddRoute(r) + + } + } + return oldCtx, nil +} diff --git a/pkg/ingress/utils/manifest.go b/pkg/ingress/utils/manifest.go index c67de92d97..09c2b0f3a0 100644 --- a/pkg/ingress/utils/manifest.go +++ b/pkg/ingress/utils/manifest.go @@ -211,9 +211,65 @@ func (m *Manifest) Diff(om *Manifest) (added, updated, deleted *Manifest) { return } +// Due to dependency, delete priority should be last func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest) error { var merr *multierror.Error + if added != nil { + // Should create upstreams firstly due to the dependencies. + for _, ssl := range added.SSLs { + if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, u := range added.Upstreams { + if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, pc := range added.PluginConfigs { + if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, r := range added.Routes { + if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, sr := range added.StreamRoutes { + if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { + merr = multierror.Append(merr, err) + } + } + } + if updated != nil { + for _, ssl := range updated.SSLs { + if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, r := range updated.Upstreams { + if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, pc := range updated.PluginConfigs { + if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, r := range updated.Routes { + if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil { + merr = multierror.Append(merr, err) + } + } + for _, sr := range updated.StreamRoutes { + if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { + merr = multierror.Append(merr, err) + } + } + } if deleted != nil { for _, ssl := range deleted.SSLs { if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil { @@ -310,61 +366,6 @@ func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string } } } - if added != nil { - // Should create upstreams firstly due to the dependencies. - for _, ssl := range added.SSLs { - if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, u := range added.Upstreams { - if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, pc := range added.PluginConfigs { - if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, r := range added.Routes { - if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, sr := range added.StreamRoutes { - if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { - merr = multierror.Append(merr, err) - } - } - } - if updated != nil { - for _, ssl := range updated.SSLs { - if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, r := range updated.Upstreams { - if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, pc := range updated.PluginConfigs { - if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, r := range updated.Routes { - if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil { - merr = multierror.Append(merr, err) - } - } - for _, sr := range updated.StreamRoutes { - if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil { - merr = multierror.Append(merr, err) - } - } - } if merr != nil { return merr } diff --git a/test/e2e/suite-chore/consistency.go b/test/e2e/suite-chore/consistency.go new file mode 100644 index 0000000000..a6cb3a3e24 --- /dev/null +++ b/test/e2e/suite-chore/consistency.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package chore + +import ( + "fmt" + "net/http" + "time" + + ginkgo "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var ( + _routeConfig = ` +apiVersion: apisix.apache.org/v2beta3 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.org + paths: + - /* + backends: + - serviceName: %s + servicePort: %d +` + _httpServiceConfig = ` +apiVersion: v1 +kind: Service +metadata: + name: httpbin-service-e2e-test +spec: + selector: + app: httpbin-deployment-e2e-test + ports: + - name: http + port: %d + protocol: TCP + targetPort: %d + type: ClusterIP +` +) + +var _ = ginkgo.Describe("suite-chore: Consistency between APISIX and Ingress", func() { + suites := func(s *scaffold.Scaffold) { + ginkgo.It("ApisixRoute and APISIX of route and upstream", func() { + httpService := fmt.Sprintf(_httpServiceConfig, 8080, 8080) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(httpService)) + + ar := fmt.Sprintf(_routeConfig, "httpbin-service-e2e-test", 8080) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar)) + + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1)) + assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1)) + + upstreams, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), upstreams, 1) + assert.Contains(ginkgo.GinkgoT(), upstreams[0].Name, "httpbin-service-e2e-test_8080") + // The correct httpbin pod port is 80 + s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusBadGateway) + + httpService = fmt.Sprintf(_httpServiceConfig, 80, 80) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(httpService)) + + ar = fmt.Sprintf(_routeConfig, "httpbin-service-e2e-test", 80) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar)) + + time.Sleep(6 * time.Second) + + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 1) + upstreams, err = s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), upstreams, 1) + assert.Contains(ginkgo.GinkgoT(), upstreams[0].Name, "httpbin-service-e2e-test_80") + + s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK) + }) + } + + ginkgo.Describe("suite-chore: scaffold v2beta3", func() { + suites(scaffold.NewDefaultV2beta3Scaffold()) + }) + ginkgo.Describe("suite-chore: scaffold v2", func() { + suites(scaffold.NewDefaultV2Scaffold()) + }) +})