Skip to content

Commit

Permalink
fix: translate error of old ApisixRoute (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlinsRan committed Jul 27, 2022
1 parent 8b51c6e commit 3d720c0
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 74 deletions.
98 changes: 79 additions & 19 deletions pkg/ingress/apisix_route.go
Expand Up @@ -326,25 +326,7 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
} else if ev.Type == types.EventAdd {
added = m
} else {
var oldCtx *translation.TranslateContext
switch obj.GroupVersion {
case config.ApisixV2beta2:
oldCtx, err = c.controller.translator.TranslateRouteV2beta2(obj.OldObject.V2beta2())
case config.ApisixV2beta3:
oldCtx, err = c.controller.translator.TranslateRouteV2beta3(obj.OldObject.V2beta3())
case config.ApisixV2:
oldCtx, err = c.controller.translator.TranslateRouteV2(obj.OldObject.V2())
}
if err != nil {
log.Errorw("failed to translate old ApisixRoute",
zap.String("version", obj.GroupVersion),
zap.String("event", "update"),
zap.Error(err),
zap.Any("ApisixRoute", ar),
)
return err
}

oldCtx, _ := c.getOldTranslateContext(ctx, obj.OldObject)
om := &utils.Manifest{
Routes: oldCtx.Routes,
Upstreams: oldCtx.Upstreams,
Expand Down Expand Up @@ -693,3 +675,81 @@ func (c *apisixRouteController) handleSvcErr(key string, errOrigin error) {
)
c.workqueue.AddRateLimited(key)
}

// Building objects from cache
// For old objects, you cannot use TranslateRoute to build. Because it needs to parse the latest service, which will cause data inconsistency
func (c *apisixRouteController) getOldTranslateContext(ctx context.Context, kar kube.ApisixRoute) (*translation.TranslateContext, error) {
clusterName := c.controller.cfg.APISIX.DefaultClusterName
oldCtx := translation.DefaultEmptyTranslateContext()

switch c.controller.cfg.Kubernetes.ApisixRouteVersion {
case config.ApisixV2beta3:
ar := kar.V2beta3()
for _, part := range ar.Spec.Stream {
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr, err := c.controller.apisix.Cluster(clusterName).StreamRoute().Get(ctx, name)
if err != nil {
continue
}
if sr.UpstreamId != "" {
ups := apisixv1.NewDefaultUpstream()
ups.ID = sr.UpstreamId
oldCtx.AddUpstream(ups)
}
oldCtx.AddStreamRoute(sr)
}
for _, part := range ar.Spec.HTTP {
name := apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
r, err := c.controller.apisix.Cluster(clusterName).Route().Get(ctx, name)
if err != nil {
continue
}
if r.UpstreamId != "" {
ups := apisixv1.NewDefaultUpstream()
ups.ID = r.UpstreamId
oldCtx.AddUpstream(ups)
}
if r.PluginConfigId != "" {
pc := apisixv1.NewDefaultPluginConfig()
pc.ID = r.PluginConfigId
oldCtx.AddPluginConfig(pc)
}
oldCtx.AddRoute(r)
}
case config.ApisixV2:
ar := kar.V2()
for _, part := range ar.Spec.Stream {
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr, err := c.controller.apisix.Cluster(clusterName).StreamRoute().Get(ctx, name)
if err != nil {
continue
}
if sr.UpstreamId != "" {
ups := apisixv1.NewDefaultUpstream()
ups.ID = sr.UpstreamId
oldCtx.AddUpstream(ups)
}
oldCtx.AddStreamRoute(sr)
}
for _, part := range ar.Spec.HTTP {
name := apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
r, err := c.controller.apisix.Cluster(clusterName).Route().Get(ctx, name)
if err != nil {
continue
}
if r.UpstreamId != "" {
ups := apisixv1.NewDefaultUpstream()
ups.ID = r.UpstreamId
oldCtx.AddUpstream(ups)
}
if r.PluginConfigId != "" {
pc := apisixv1.NewDefaultPluginConfig()
pc.ID = r.PluginConfigId
oldCtx.AddPluginConfig(pc)
}
oldCtx.AddRoute(r)

}
}
return oldCtx, nil
}
111 changes: 56 additions & 55 deletions pkg/ingress/utils/manifest.go
Expand Up @@ -211,9 +211,65 @@ func (m *Manifest) Diff(om *Manifest) (added, updated, deleted *Manifest) {
return
}

// Due to dependency, delete priority should be last
func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest) error {
var merr *multierror.Error

if added != nil {
// Should create upstreams firstly due to the dependencies.
for _, ssl := range added.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, u := range added.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range added.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range added.Routes {
if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range added.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
for _, ssl := range updated.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range updated.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Routes {
if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range updated.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if deleted != nil {
for _, ssl := range deleted.SSLs {
if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
Expand Down Expand Up @@ -310,61 +366,6 @@ func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string
}
}
}
if added != nil {
// Should create upstreams firstly due to the dependencies.
for _, ssl := range added.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, u := range added.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range added.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range added.Routes {
if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range added.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
for _, ssl := range updated.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range updated.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Routes {
if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range updated.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if merr != nil {
return merr
}
Expand Down
108 changes: 108 additions & 0 deletions test/e2e/suite-chore/consistency.go
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chore

import (
"fmt"
"net/http"
"time"

ginkgo "github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var (
_routeConfig = `
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.org
paths:
- /*
backends:
- serviceName: %s
servicePort: %d
`
_httpServiceConfig = `
apiVersion: v1
kind: Service
metadata:
name: httpbin-service-e2e-test
spec:
selector:
app: httpbin-deployment-e2e-test
ports:
- name: http
port: %d
protocol: TCP
targetPort: %d
type: ClusterIP
`
)

var _ = ginkgo.Describe("suite-chore: Consistency between APISIX and Ingress", func() {
suites := func(s *scaffold.Scaffold) {
ginkgo.It("ApisixRoute and APISIX of route and upstream", func() {
httpService := fmt.Sprintf(_httpServiceConfig, 8080, 8080)
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(httpService))

ar := fmt.Sprintf(_routeConfig, "httpbin-service-e2e-test", 8080)
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar))

assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1))
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1))

upstreams, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), upstreams, 1)
assert.Contains(ginkgo.GinkgoT(), upstreams[0].Name, "httpbin-service-e2e-test_8080")
// The correct httpbin pod port is 80
s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusBadGateway)

httpService = fmt.Sprintf(_httpServiceConfig, 80, 80)
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(httpService))

ar = fmt.Sprintf(_routeConfig, "httpbin-service-e2e-test", 80)
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar))

time.Sleep(6 * time.Second)

routes, err := s.ListApisixRoutes()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), routes, 1)
upstreams, err = s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), upstreams, 1)
assert.Contains(ginkgo.GinkgoT(), upstreams[0].Name, "httpbin-service-e2e-test_80")

s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK)
})
}

ginkgo.Describe("suite-chore: scaffold v2beta3", func() {
suites(scaffold.NewDefaultV2beta3Scaffold())
})
ginkgo.Describe("suite-chore: scaffold v2", func() {
suites(scaffold.NewDefaultV2Scaffold())
})
})

0 comments on commit 3d720c0

Please sign in to comment.