Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support resolveGranularity of ApisixRoute #1251

Merged
merged 14 commits into from
Sep 9, 2022
7 changes: 5 additions & 2 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ type PluginConfig interface {
}

type UpstreamServiceRelation interface {
// Get relation based on namespace+"_"+service.name
Get(context.Context, string) (*v1.UpstreamServiceRelation, error)
List(context.Context) ([]*v1.UpstreamServiceRelation, error)
Delete(context.Context, *v1.UpstreamServiceRelation) error
Create(context.Context, *v1.UpstreamServiceRelation) error
// Delete relation based on namespace+"_"+service.name
Delete(context.Context, string) error
// Build relation based on upstream.name
Create(context.Context, string) error
}

type apisix struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/apisix/cache/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,10 @@ func TestMemDBCacheUpstreamServiceRelation(t *testing.T) {
assert.Equal(t, us2, us)

us3 := &v1.UpstreamServiceRelation{
ServiceName: "httpbin",
UpstreamName: "upstream",
ServiceName: "httpbin",
UpstreamNames: map[string]struct{}{
"upstream": {},
},
}
assert.Nil(t, c.InsertUpstreamServiceRelation(us3), "inserting upstream_service 3")

Expand Down
4 changes: 2 additions & 2 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ type dummyUpstreamServiceRelation struct {
func (f *dummyUpstreamServiceRelation) Get(_ context.Context, _ string) (*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ *v1.UpstreamServiceRelation) error {
func (f *dummyUpstreamServiceRelation) Create(_ context.Context, _ string) error {
return ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) List(_ context.Context) ([]*v1.UpstreamServiceRelation, error) {
return nil, ErrClusterNotExist
}
func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ *v1.UpstreamServiceRelation) error {
func (f *dummyUpstreamServiceRelation) Delete(_ context.Context, _ string) error {
return ErrClusterNotExist
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/apisix/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ func TestItemConvertUpstream(t *testing.T) {
ups, err := ite.upstream()
assert.Nil(t, err)
assert.Len(t, ups.Nodes, 2)
assert.Equal(t, ups.Nodes[0], v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1})
assert.Equal(t, ups.Nodes[1], v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2})
if ups.Nodes[0].Host == "foo.com" {
ups.Nodes[0], ups.Nodes[1] = ups.Nodes[1], ups.Nodes[0]
lingsamuel marked this conversation as resolved.
Show resolved Hide resolved
}
assert.Equal(t, v1.UpstreamNode{Host: "httpbin.org", Port: 80, Weight: 1}, ups.Nodes[0])
assert.Equal(t, v1.UpstreamNode{Host: "foo.com", Port: 8080, Weight: 2}, ups.Nodes[1])

ite = &item{
Key: "/apisix/upstreams/419655639963271872",
Expand Down
7 changes: 2 additions & 5 deletions pkg/apisix/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
zap.String("cluster", "default"),
)

if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
}
if err := u.cluster.HasSynced(ctx); err != nil {
Expand Down Expand Up @@ -187,9 +187,6 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
return err
}
}
if err := u.cluster.upstreamServiceRelation.Delete(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
log.Errorf("failed to delete upstreamService in cache: %s", err)
}
url := u.url + "/" + obj.ID
if err := u.cluster.deleteResource(ctx, url, "upstream"); err != nil {
u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
Expand All @@ -207,7 +204,7 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upst
zap.String("url", u.url),
)

if err := u.cluster.upstreamServiceRelation.Create(ctx, &v1.UpstreamServiceRelation{UpstreamName: obj.Name}); err != nil {
if err := u.cluster.upstreamServiceRelation.Create(ctx, obj.Name); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
}
if err := u.cluster.HasSynced(ctx); err != nil {
Expand Down
101 changes: 47 additions & 54 deletions pkg/apisix/upstreamservicerelation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apisix
import (
"context"
"fmt"
"strconv"
"strings"

"go.uber.org/zap"
Expand All @@ -26,6 +27,11 @@ import (
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

// to do: Delete one of the upstreams. Currently, only service is deleted. There will be some
// redundant upstream objects, but the results will not be affected. It is hoped that the service controller
// can complete the update nodes logic to avoid the intrusion of relation modules into more code.

// Maintain relationships only when resolveGranularity is endpoint
// There is no need to ensure the consistency between the upstream to services, only need to ensure that the upstream-node can be delete after deleting the service
type upstreamService struct {
cluster *cluster
Expand All @@ -37,79 +43,78 @@ func newUpstreamServiceRelation(c *cluster) *upstreamService {
}
}

func (u *upstreamService) Get(ctx context.Context, svcId string) (*v1.UpstreamServiceRelation, error) {
func (u *upstreamService) Get(ctx context.Context, serviceName string) (*v1.UpstreamServiceRelation, error) {
log.Debugw("try to get upstreamService in cache",
zap.String("svcId", svcId),
zap.String("service_name", serviceName),
zap.String("cluster", "default"),
)
us, err := u.cluster.cache.GetUpstreamServiceRelation(svcId)
if err == nil {
return us, nil
}
if err != cache.ErrNotFound {
us, err := u.cluster.cache.GetUpstreamServiceRelation(serviceName)
if err != nil && err != cache.ErrNotFound {
log.Error("failed to find upstreamService in cache",
zap.String("svcId", svcId), zap.Error(err))
} else {
log.Debugw("failed to find upstreamService in cache",
zap.String("svcId", svcId), zap.Error(err))
zap.String("service_name", serviceName), zap.Error(err))
return nil, err
}
return nil, err
return us, err
}

func (u *upstreamService) Delete(ctx context.Context, relation *v1.UpstreamServiceRelation) error {
func (u *upstreamService) Delete(ctx context.Context, serviceName string) error {
log.Debugw("try to delete upstreamService in cache",
zap.String("cluster", "default"),
)
u.initUpstreamServiceRelation(relation)
if relation == nil || relation.ServiceName == "" && relation.UpstreamName == "" {
return fmt.Errorf("UpstreamServiceRelation is empty object")
}
if relation.UpstreamName != "" {
err := u.cluster.cache.DeleteUpstreamServiceRelation(relation)
if err != nil {
return err
}
} else {
usr, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName)
if err != nil {
return err
relation, err := u.Get(ctx, serviceName)
if err != nil {
if err == cache.ErrNotFound {
return nil
}
ups, err := u.cluster.upstream.Get(ctx, usr.UpstreamName)
return err
}
_ = u.cluster.cache.DeleteUpstreamServiceRelation(relation)
for upsName := range relation.UpstreamNames {
ups, err := u.cluster.upstream.Get(ctx, upsName)
if err != nil {
return err
continue
}
ups.Nodes = make(v1.UpstreamNodes, 0)
_, err = u.cluster.upstream.Update(ctx, ups)
if err != nil {
return err
}
err = u.cluster.cache.DeleteUpstreamServiceRelation(usr)
if err != nil {
return err
continue
}
}
return nil
}

func (u *upstreamService) Create(ctx context.Context, relation *v1.UpstreamServiceRelation) error {
func (u *upstreamService) Create(ctx context.Context, upstreamName string) error {
log.Debugw("try to create upstreamService in cache",
zap.String("cluster", "default"),
)
u.initUpstreamServiceRelation(relation)
if relation == nil || relation.ServiceName == "" || relation.UpstreamName == "" {
log.Error("UpstreamServiceRelation object ")

args := strings.Split(upstreamName, "_")
if len(args) < 2 {
return fmt.Errorf("wrong upstream name %s, must contains namespace_name", upstreamName)
}
// The last part of upstreanName should be a port number.
// Please refer to apisixv1.ComposeUpstreamName to see the detailed format.
_, err := strconv.Atoi(args[len(args)-1])
if err != nil {
return nil
}
us, err := u.cluster.cache.GetUpstreamServiceRelation(relation.ServiceName)

serviceName := args[0] + "_" + args[1]
relation, err := u.Get(ctx, serviceName)
if err != nil && err != cache.ErrNotFound {
return err
}
if us != nil {
us.UpstreamName = relation.UpstreamName
if relation == nil {
relation = &v1.UpstreamServiceRelation{
ServiceName: serviceName,
UpstreamNames: map[string]struct{}{
upstreamName: {},
},
}
} else {
us = relation
relation.UpstreamNames[upstreamName] = struct{}{}
}
if err := u.cluster.cache.InsertUpstreamServiceRelation(us); err != nil {
if err := u.cluster.cache.InsertUpstreamServiceRelation(relation); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
return err
}
Expand All @@ -129,15 +134,3 @@ func (u *upstreamService) List(ctx context.Context) ([]*v1.UpstreamServiceRelati
}
return usrs, nil
}

func (u *upstreamService) initUpstreamServiceRelation(us *v1.UpstreamServiceRelation) {
if us.UpstreamName == "" || us.ServiceName != "" {
return
}
args := strings.Split(us.UpstreamName, "_")
// namespace_service_subset_port
if len(args) < 2 {
return
}
us.ServiceName = args[0] + "_" + args[1]
}
Loading