Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func main() {
setupLog.Fatalf("iam auth policy controller setup failed: %s", err)
}

err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), mgr, cloud)
err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), cloud, finalizerManager, mgr)
if err != nil {
setupLog.Fatalf("vpc association policy controller setup failed: %s", err)
}
Expand Down
56 changes: 27 additions & 29 deletions controllers/vpcassociationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,37 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"github.com/aws/aws-application-networking-k8s/controllers/eventhandlers"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

const (
finalizer = "vpcassociationpolicies.application-networking.k8s.aws/resources"
)

type vpcAssociationPolicyReconciler struct {
log gwlog.Logger
client client.Client
cloud pkg_aws.Cloud
manager deploy.ServiceNetworkManager
log gwlog.Logger
client client.Client
cloud pkg_aws.Cloud
finalizerManager k8s.FinalizerManager
manager deploy.ServiceNetworkManager
}

func RegisterVpcAssociationPolicyController(log gwlog.Logger, mgr ctrl.Manager, cloud pkg_aws.Cloud) error {
func RegisterVpcAssociationPolicyController(log gwlog.Logger, cloud pkg_aws.Cloud, finalizerManager k8s.FinalizerManager, mgr ctrl.Manager) error {
controller := &vpcAssociationPolicyReconciler{
log: log,
client: mgr.GetClient(),
cloud: cloud,
manager: deploy.NewDefaultServiceNetworkManager(log, cloud),
log: log,
client: mgr.GetClient(),
cloud: cloud,
finalizerManager: finalizerManager,
manager: deploy.NewDefaultServiceNetworkManager(log, cloud),
}

eh := eventhandlers.NewPolicyEventHandler(log, mgr.GetClient(), &anv1alpha1.VpcAssociationPolicy{})
Expand Down Expand Up @@ -77,11 +83,6 @@ func (c *vpcAssociationPolicyReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
}

err = c.handleFinalizer(ctx, k8sPolicy)
if err != nil {
return ctrl.Result{}, err
}

c.log.Infow("reconciled vpc association policy",
"req", req,
"targetRef", k8sPolicy.Spec.TargetRef,
Expand All @@ -90,21 +91,11 @@ func (c *vpcAssociationPolicyReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, nil
}

func (c *vpcAssociationPolicyReconciler) handleFinalizer(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy) error {
finalizer := "vpcassociationpolicies.application-networking.k8s.aws/resources"
if k8sPolicy.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(k8sPolicy, finalizer) {
controllerutil.AddFinalizer(k8sPolicy, finalizer)
}
} else {
if controllerutil.ContainsFinalizer(k8sPolicy, finalizer) {
controllerutil.RemoveFinalizer(k8sPolicy, finalizer)
}
}
return c.client.Update(ctx, k8sPolicy)
}

func (c *vpcAssociationPolicyReconciler) upsert(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy) error {
err := c.finalizerManager.AddFinalizers(ctx, k8sPolicy, finalizer)
if err != nil {
return err
}
snName := string(k8sPolicy.Spec.TargetRef.Name)
sgIds := utils.SliceMap(k8sPolicy.Spec.SecurityGroupIds, func(sg anv1alpha1.SecurityGroupId) *string {
str := string(sg)
Expand Down Expand Up @@ -132,6 +123,10 @@ func (c *vpcAssociationPolicyReconciler) delete(ctx context.Context, k8sPolicy *
if err != nil {
return c.handleDeleteError(err)
}
err = c.finalizerManager.RemoveFinalizers(ctx, k8sPolicy, finalizer)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -169,6 +164,9 @@ func (c *vpcAssociationPolicyReconciler) handleDeleteError(err error) error {
}

func (c *vpcAssociationPolicyReconciler) updateLatticeAnnotation(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy, resArn string) error {
if k8sPolicy.Annotations == nil {
k8sPolicy.Annotations = make(map[string]string)
}
k8sPolicy.Annotations["application-networking.k8s.aws/resourceArn"] = resArn
err := c.client.Update(ctx, k8sPolicy)
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/deploy/lattice/service_network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (m *defaultServiceNetworkManager) CreateOrUpdate(ctx context.Context, servi
}
resp, err := vpcLatticeSess.CreateServiceNetworkWithContext(ctx, &serviceNetworkInput)
if err != nil {
return model.ServiceNetworkStatus{ServiceNetworkARN: "", ServiceNetworkID: ""}, err
return model.ServiceNetworkStatus{}, err
}

serviceNetworkId = aws.StringValue(resp.Id)
Expand All @@ -201,6 +201,7 @@ func (m *defaultServiceNetworkManager) CreateOrUpdate(ctx context.Context, servi
createServiceNetworkVpcAssociationInput := vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &serviceNetworkId,
VpcIdentifier: &config.VpcID,
Tags: m.cloud.DefaultTags(),
}
_, err = vpcLatticeSess.CreateServiceNetworkVpcAssociationWithContext(ctx, &createServiceNetworkVpcAssociationInput)
if err != nil {
Expand Down
22 changes: 14 additions & 8 deletions pkg/deploy/lattice/service_network_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Test_CreateOrUpdateServiceNetwork_SnNotExist_NeedToAssociate(t *testing.T)
createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
Tags: cloud.DefaultTags(),
}
associationStatus := vpclattice.ServiceNetworkVpcAssociationStatusActive
createServiceNetworkVPCAssociationOutput := &vpclattice.CreateServiceNetworkVpcAssociationOutput{
Expand Down Expand Up @@ -286,10 +287,6 @@ func Test_CreateOrUpdateServiceNetwork_SnAlreadyExist_ServiceNetworkVpcAssociati
createServiceNetworkVPCAssociationOutput := &vpclattice.CreateServiceNetworkVpcAssociationOutput{
Status: &associationStatus,
}
createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
}

c := gomock.NewController(t)
defer c.Finish()
Expand All @@ -305,6 +302,12 @@ func Test_CreateOrUpdateServiceNetwork_SnAlreadyExist_ServiceNetworkVpcAssociati
SvcNetwork: item,
Tags: snTagsOuput.Tags,
}, nil)

createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
Tags: cloud.DefaultTags(),
}
mockLattice.EXPECT().CreateServiceNetworkVpcAssociationWithContext(ctx, createServiceNetworkVpcAssociationInput).Return(createServiceNetworkVPCAssociationOutput, nil)

snMgr := NewDefaultServiceNetworkManager(gwlog.FallbackLogger, cloud)
Expand Down Expand Up @@ -349,10 +352,6 @@ func Test_CreateOrUpdateServiceNetwork_SnAlreadyExist_SnAssociatedWithOtherVPC(t
createServiceNetworkVPCAssociationOutput := &vpclattice.CreateServiceNetworkVpcAssociationOutput{
Status: &associationStatus,
}
createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
}

c := gomock.NewController(t)
defer c.Finish()
Expand All @@ -368,6 +367,12 @@ func Test_CreateOrUpdateServiceNetwork_SnAlreadyExist_SnAssociatedWithOtherVPC(t
SvcNetwork: item,
Tags: snTagsOuput.Tags,
}, nil)

createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
Tags: cloud.DefaultTags(),
}
mockLattice.EXPECT().CreateServiceNetworkVpcAssociationWithContext(ctx, createServiceNetworkVpcAssociationInput).Return(createServiceNetworkVPCAssociationOutput, nil)

snMgr := NewDefaultServiceNetworkManager(gwlog.FallbackLogger, cloud)
Expand Down Expand Up @@ -410,6 +415,7 @@ func Test_CreateOrUpdateServiceNetwork_SnNotExist_ServiceNetworkVpcAssociationRe
createServiceNetworkVpcAssociationInput := &vpclattice.CreateServiceNetworkVpcAssociationInput{
ServiceNetworkIdentifier: &snId,
VpcIdentifier: &config.VpcID,
Tags: cloud.DefaultTags(),
}

mockLattice.EXPECT().FindServiceNetwork(ctx, gomock.Any(), gomock.Any()).Return(nil, nil)
Expand Down
82 changes: 41 additions & 41 deletions pkg/deploy/lattice/target_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,26 +248,27 @@ type tgListOutput struct {
func (s *defaultTargetGroupManager) List(ctx context.Context) ([]tgListOutput, error) {
lattice := s.cloud.Lattice()
var tgList []tgListOutput
targetGroupListInput := vpclattice.ListTargetGroupsInput{
VpcIdentifier: aws.String(config.VpcID),
TargetGroupType: aws.String(vpclattice.TargetGroupTypeIp),
}
targetGroupListInput := vpclattice.ListTargetGroupsInput{}
resp, err := lattice.ListTargetGroupsAsList(ctx, &targetGroupListInput)
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, nil
}
tgArns := utils.SliceMap(resp, func(tg *vpclattice.TargetGroupSummary) string {
validTgs := utils.SliceFilter(resp, func(tg *vpclattice.TargetGroupSummary) bool {
return aws.StringValue(tg.VpcIdentifier) == config.VpcID &&
aws.StringValue(tg.Type) == vpclattice.TargetGroupTypeIp
})
tgArns := utils.SliceMap(validTgs, func(tg *vpclattice.TargetGroupSummary) string {
return aws.StringValue(tg.Arn)
})
tgArnToTagsMap, err := s.cloud.Tagging().GetTagsForArns(ctx, tgArns)

if err != nil {
return nil, err
}
for _, tg := range resp {
for _, tg := range validTgs {
tgList = append(tgList, tgListOutput{
tgSummary: tg,
tags: tgArnToTagsMap[*tg.Arn],
Expand All @@ -288,44 +289,43 @@ func (s *defaultTargetGroupManager) findTargetGroup(
if len(arns) == 0 {
return nil, nil
}
// Tag fields guarantee one result, as there can be only one target group for one service/route combination.
// We move forward but log this situation to help troubleshooting
if len(arns) > 1 {
s.log.Warnw("Target groups with conflicting tags found", "arns", arns)
}
arn := arns[0]

latticeTg, err := s.cloud.Lattice().GetTargetGroupWithContext(ctx, &vpclattice.GetTargetGroupInput{
TargetGroupIdentifier: &arn,
})
if err != nil {
return nil, services.IgnoreNotFound(err)
}
for _, arn := range arns {
latticeTg, err := s.cloud.Lattice().GetTargetGroupWithContext(ctx, &vpclattice.GetTargetGroupInput{
TargetGroupIdentifier: &arn,
})
if err != nil {
if services.IsNotFoundError(err) {
continue
}
return nil, err
}

// we ignore create failed status, so may as well check for it first
status := aws.StringValue(latticeTg.Status)
if status == vpclattice.TargetGroupStatusCreateFailed {
return nil, nil
}
// we ignore create failed status, so may as well check for it first
status := aws.StringValue(latticeTg.Status)
if status == vpclattice.TargetGroupStatusCreateFailed {
continue
}

// Double-check the immutable fields to ensure TG is valid
match, err := s.IsTargetGroupMatch(ctx, modelTargetGroup, &vpclattice.TargetGroupSummary{
Arn: latticeTg.Arn,
Port: latticeTg.Config.Port,
Protocol: latticeTg.Config.Protocol,
IpAddressType: latticeTg.Config.IpAddressType,
Type: latticeTg.Type,
VpcIdentifier: latticeTg.Config.VpcIdentifier,
}, nil) // we already know that tags match
if err != nil {
return nil, err
}
if match {
switch status {
case vpclattice.TargetGroupStatusCreateInProgress, vpclattice.TargetGroupStatusDeleteInProgress:
return nil, errors.New(LATTICE_RETRY)
case vpclattice.TargetGroupStatusDeleteFailed, vpclattice.TargetGroupStatusActive:
return latticeTg, nil
// Check the immutable fields to ensure TG is valid
match, err := s.IsTargetGroupMatch(ctx, modelTargetGroup, &vpclattice.TargetGroupSummary{
Arn: latticeTg.Arn,
Port: latticeTg.Config.Port,
Protocol: latticeTg.Config.Protocol,
IpAddressType: latticeTg.Config.IpAddressType,
Type: latticeTg.Type,
VpcIdentifier: latticeTg.Config.VpcIdentifier,
}, nil) // we already know that tags match
Copy link
Contributor

@zijun726911 zijun726911 Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering should we add a Protocol(or other tg fields) tags for TargetGroup?
So that to make sure one FindResourcesByTags() api call could definitely exactly match one TG?

(not in this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is necessary. Actually.. I wanted to remove as many tags as possible. I "had to" add protocolVersion in the tag because it is not available in TGSummary, so it needed extra Get() call to be figured out. Ideally VPC Lattice should just return protocolVersion in the summary instead.

if err != nil {
return nil, err
}
if match {
switch status {
case vpclattice.TargetGroupStatusCreateInProgress, vpclattice.TargetGroupStatusDeleteInProgress:
return nil, errors.New(LATTICE_RETRY)
case vpclattice.TargetGroupStatusDeleteFailed, vpclattice.TargetGroupStatusActive:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return err for TargetGroupStatusDeleteFailed tg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, they do exist.

return latticeTg, nil
}
}
}

Expand Down
19 changes: 13 additions & 6 deletions pkg/deploy/lattice/target_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,16 +777,23 @@ func Test_ListTG_TGsExist(t *testing.T) {
arn := "123456789"
id := "123456789"
name1 := "test1"
config.VpcID = "vpc-id"
config.ClusterName = "cluster-name"
tgType := vpclattice.TargetGroupTypeIp
tg1 := &vpclattice.TargetGroupSummary{
Arn: &arn,
Id: &id,
Name: &name1,
Arn: &arn,
Id: &id,
Name: &name1,
VpcIdentifier: &config.VpcID,
Type: &tgType,
}
name2 := "test2"
tg2 := &vpclattice.TargetGroupSummary{
Arn: &arn,
Id: &id,
Name: &name2,
Arn: &arn,
Id: &id,
Name: &name2,
VpcIdentifier: &config.VpcID,
Type: &tgType,
}
listTGOutput := []*vpclattice.TargetGroupSummary{tg1, tg2}

Expand Down
Loading