Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion controllers/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var routeTypeToFinalizer = map[core.RouteType]string{
Expand Down Expand Up @@ -109,7 +111,7 @@ func RegisterAllRouteControllers(
svcImportEventHandler := eventhandlers.NewServiceImportEventHandler(log, mgrClient)

builder := ctrl.NewControllerManagedBy(mgr).
For(routeInfo.gatewayApiType).
For(routeInfo.gatewayApiType, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1beta1.Gateway{}}, gwEventHandler).
Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &mcsv1alpha1.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
Expand Down Expand Up @@ -337,6 +339,21 @@ func (r *routeReconciler) reconcileUpsert(ctx context.Context, req ctrl.Request,
}

if _, err := r.buildAndDeployModel(ctx, route); err != nil {
if services.IsConflictError(err) {
// Stop reconciliation of this route if the route cannot be owned / has conflict
route.Status().UpdateParentRefs(route.Spec().ParentRefs()[0], config.LatticeGatewayControllerName)
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionFalse,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: "Conflicted",
Message: err.Error(),
})
if err = r.client.Status().Update(ctx, route.K8sObject()); err != nil {
return fmt.Errorf("failed to update route status for conflict due to err %w", err)
}
return nil
}
return err
}

Expand Down
1 change: 1 addition & 0 deletions controllers/route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func TestRouteReconciler_ReconcileCreates(t *testing.T) {
ClusterName: config.ClusterName,
}).AnyTimes()
mockCloud.EXPECT().DefaultTags().Return(mocks.Tags{}).AnyTimes()
mockCloud.EXPECT().DefaultTagsMergedWith(gomock.Any()).Return(mocks.Tags{}).AnyTimes()

// we expect a fair number of lattice calls
mockLattice.EXPECT().FindServiceNetwork(ctx, gomock.Any(), gomock.Any()).Return(
Expand Down
58 changes: 49 additions & 9 deletions pkg/aws/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/aws/aws-sdk-go/service/vpclattice"
"golang.org/x/exp/maps"

"context"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)
Expand Down Expand Up @@ -36,11 +37,14 @@ type Cloud interface {
// creates lattice tags with default values populated and merges them with provided tags
DefaultTagsMergedWith(services.Tags) services.Tags

// check if tags map has managedBy tag
// Retrieve tags and check if tags map has managedBy tag
ContainsManagedBy(tags services.Tags) bool

// check if managedBy tag set for lattice resource
IsArnManaged(arn string) (bool, error)
IsArnManaged(ctx context.Context, arn string) (bool, error)

// check ownership and acquire if it is not owned by anyone.
CheckAndAcquireOwnershipFromTags(ctx context.Context, arn string, tags services.Tags) (bool, error)
}

// NewCloud constructs new Cloud implementation.
Expand Down Expand Up @@ -107,22 +111,58 @@ func (c *defaultCloud) DefaultTagsMergedWith(tags services.Tags) services.Tags {
return newTags
}

func (c *defaultCloud) ContainsManagedBy(tags services.Tags) bool {
func (c *defaultCloud) getManagedByFromTags(tags services.Tags) string {
tag, ok := tags[TagManagedBy]
if !ok || tag == nil {
return false
return ""
}
return *tag == c.managedByTag
return *tag
}

func (c *defaultCloud) IsArnManaged(arn string) (bool, error) {
func (c *defaultCloud) getManagedBy(ctx context.Context, arn string) (string, error) {
tagsReq := &vpclattice.ListTagsForResourceInput{ResourceArn: &arn}
resp, err := c.lattice.ListTagsForResource(tagsReq)
resp, err := c.lattice.ListTagsForResourceWithContext(ctx, tagsReq)
if err != nil {
return "", err
}
return c.getManagedByFromTags(resp.Tags), nil
}

func (c *defaultCloud) ContainsManagedBy(tags services.Tags) bool {
return c.isOwner(c.getManagedByFromTags(tags))
}

func (c *defaultCloud) IsArnManaged(ctx context.Context, arn string) (bool, error) {
managedBy, err := c.getManagedBy(ctx, arn)
if err != nil {
return false, nil
}
isManaged := c.ContainsManagedBy(resp.Tags)
return isManaged, nil
return c.isOwner(managedBy), nil
}

func (c *defaultCloud) CheckAndAcquireOwnershipFromTags(ctx context.Context, arn string, tags services.Tags) (bool, error) {
// For resources that need backwards compatibility - not having managedBy is considered as owned by controller.
managedBy := c.getManagedByFromTags(tags)
if managedBy == "" {
err := c.acquireOwnership(ctx, arn)
if err != nil {
return false, err
}
return true, nil
}
return c.isOwner(managedBy), nil
}

func (c *defaultCloud) acquireOwnership(ctx context.Context, arn string) error {
_, err := c.Lattice().TagResourceWithContext(ctx, &vpclattice.TagResourceInput{
ResourceArn: &arn,
Tags: c.DefaultTags(),
})
return err
}

func (c *defaultCloud) isOwner(managedBy string) bool {
return managedBy == c.managedByTag
}

func getManagedByTag(cfg CloudConfig) string {
Expand Down
24 changes: 20 additions & 4 deletions pkg/aws/cloud_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 74 additions & 9 deletions pkg/aws/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"context"
"fmt"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
)

Expand Down Expand Up @@ -46,38 +48,38 @@ func TestIsArnManaged(t *testing.T) {

t.Run("arn sent", func(t *testing.T) {
arn := "arn"
mockLattice.EXPECT().ListTagsForResource(gomock.Any()).
mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()).
DoAndReturn(
func(req *vpclattice.ListTagsForResourceInput) (*vpclattice.ListTagsForResourceOutput, error) {
func(_ context.Context, req *vpclattice.ListTagsForResourceInput, _ ...interface{}) (*vpclattice.ListTagsForResourceOutput, error) {
assert.Equal(t, arn, *req.ResourceArn)
return &vpclattice.ListTagsForResourceOutput{}, nil
})
cl.IsArnManaged(arn)
cl.IsArnManaged(context.Background(), arn)
})

t.Run("is managed", func(t *testing.T) {
arn := "arn"
mockLattice.EXPECT().ListTagsForResource(gomock.Any()).
mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()).
Return(&vpclattice.ListTagsForResourceOutput{
Tags: cl.DefaultTags(),
}, nil)
managed, err := cl.IsArnManaged(arn)
managed, err := cl.IsArnManaged(context.Background(), arn)
assert.Nil(t, err)
assert.True(t, managed)
})

t.Run("not managed", func(t *testing.T) {
mockLattice.EXPECT().ListTagsForResource(gomock.Any()).
mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()).
Return(&vpclattice.ListTagsForResourceOutput{}, nil)
managed, err := cl.IsArnManaged("arn")
managed, err := cl.IsArnManaged(context.Background(), "arn")
assert.Nil(t, err)
assert.False(t, managed)
})

t.Run("error", func(t *testing.T) {
mockLattice.EXPECT().ListTagsForResource(gomock.Any()).
mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()).
Return(nil, errors.New(":("))
managed, err := cl.IsArnManaged("arn")
managed, err := cl.IsArnManaged(context.Background(), "arn")
assert.Nil(t, err)
assert.False(t, managed)
})
Expand Down Expand Up @@ -129,3 +131,66 @@ func Test_DefaultTagsMergedWith(t *testing.T) {
assert.Equal(t, expected, actual)
})
}

func Test_CheckAndAcquireOwnershipFromTags(t *testing.T) {
c := gomock.NewController(t)
defer c.Finish()

mockLattice := services.NewMockLattice(c)
cfg := CloudConfig{VpcId: "vpc-id", AccountId: "account-id", ClusterName: "cluster"}
cloud := NewDefaultCloud(mockLattice, cfg)

tcs := []struct {
name string
tags services.Tags
owned bool
tryAcquire bool
isErr bool
}{
{
name: "no ownership tag acquires ownership",
tags: services.Tags{},
owned: true,
tryAcquire: true,
isErr: false,
},
{
name: "proper ownership tag considered valid",
tags: cloud.DefaultTags(),
owned: true,
tryAcquire: false,
isErr: false,
},
{
name: "improper ownership tag considered invalid",
tags: services.Tags{
TagManagedBy: aws.String("not/this/owner"),
},
owned: false,
tryAcquire: false,
isErr: false,
},
}

for i, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
arn := fmt.Sprintf("arn-%d", i)

tagResourceCallCount := 0
if tc.tryAcquire {
tagResourceCallCount = 1
}
mockLattice.EXPECT().TagResourceWithContext(gomock.Any(), &vpclattice.TagResourceInput{ResourceArn: aws.String(arn), Tags: cloud.DefaultTags()}).
Return(&vpclattice.TagResourceOutput{}, nil).Times(tagResourceCallCount)

res, err := cloud.CheckAndAcquireOwnershipFromTags(context.Background(), arn, tc.tags)

assert.Equal(t, tc.owned, res)
if tc.isErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
8 changes: 4 additions & 4 deletions pkg/deploy/externaldns/dnsendpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func NewDnsEndpointManager(log gwlog.Logger, k8sClient client.Client) *defaultDn

func (s *defaultDnsEndpointManager) Create(ctx context.Context, service *latticemodel.Service) error {
namespacedName := types.NamespacedName{
Namespace: service.Spec.Namespace,
Name: service.Spec.Name + "-dns",
Namespace: service.Spec.RouteNamespace,
Name: service.Spec.RouteName + "-dns",
}
if service.Spec.CustomerDomainName == "" {
s.log.Debugf("Skipping creation of %s: detected no custom domain", namespacedName)
Expand All @@ -52,8 +52,8 @@ func (s *defaultDnsEndpointManager) Create(ctx context.Context, service *lattice
err error
)
routeNamespacedName := types.NamespacedName{
Namespace: service.Spec.Namespace,
Name: service.Spec.Name,
Namespace: service.Spec.RouteNamespace,
Name: service.Spec.RouteName,
}
if service.Spec.RouteType == core.GrpcRouteType {
route, err = core.GetGRPCRoute(ctx, s.k8sClient, routeNamespacedName)
Expand Down
Loading