diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index 8b7d711a..1a31f9c8 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -32,6 +32,7 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 + - run: sed -En 's/^go[[:space:]]+([[:digit:].]+)$/GO_VERSION=\1/p' go.mod >> $GITHUB_ENV - name: Setup Go uses: actions/setup-go@v4 with: diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index 830e4c76..340492c9 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -11,6 +11,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - run: sed -En 's/^go[[:space:]]+([[:digit:].]+)$/GO_VERSION=\1/p' go.mod >> $GITHUB_ENV - uses: actions/setup-go@v4 with: go-version: ${{ env.GO_VERSION }} diff --git a/pkg/controllers/route_controller_test.go b/pkg/controllers/route_controller_test.go index 7d4e446a..38920d64 100644 --- a/pkg/controllers/route_controller_test.go +++ b/pkg/controllers/route_controller_test.go @@ -174,6 +174,17 @@ func TestRouteReconciler_ReconcileCreates(t *testing.T) { // we expect a fair number of lattice calls mockLattice.EXPECT().ListTargetsAsList(ctx, gomock.Any()).Return( []*vpclattice.TargetSummary{}, nil) + mockLattice.EXPECT().ListTargetsAsList(ctx, gomock.Any()).Return( + []*vpclattice.TargetSummary{ + { + Id: aws.String("192.0.2.22"), + Port: aws.Int64(8090), + }, + { + Id: aws.String("192.0.2.33"), + Port: aws.Int64(8090), + }, + }, nil) mockLattice.EXPECT().RegisterTargetsWithContext(ctx, gomock.Any()).Return( &vpclattice.RegisterTargetsOutput{ Successful: []*vpclattice.Target{ diff --git a/pkg/deploy/lattice/targets_manager.go b/pkg/deploy/lattice/targets_manager.go index c3714da4..42cb16ca 100644 --- a/pkg/deploy/lattice/targets_manager.go +++ b/pkg/deploy/lattice/targets_manager.go @@ -5,13 +5,13 @@ import ( "errors" "fmt" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/vpclattice" pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" "github.com/aws/aws-application-networking-k8s/pkg/utils" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + "github.com/aws/aws-sdk-go/aws" ) const ( @@ -23,6 +23,7 @@ const ( //go:generate mockgen -destination targets_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice TargetsManager type TargetsManager interface { + List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error) Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error } @@ -41,6 +42,14 @@ func NewTargetsManager( } } +func (s *defaultTargetsManager) List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error) { + lattice := s.cloud.Lattice() + listTargetsInput := vpclattice.ListTargetsInput{ + TargetGroupIdentifier: &modelTg.Status.Id, + } + return lattice.ListTargetsAsList(ctx, &listTargetsInput) +} + func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error { if modelTg.Status == nil || modelTg.Status.Id == "" { return errors.New("model target group is missing id") @@ -50,44 +59,65 @@ func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model. modelTg.ID(), modelTargets.Spec.StackTargetGroupId) } - // Only take care of pods that are ready, for backwards compatibility. - // TODO: Pod readiness support. - modelTargets.Spec.TargetList = utils.SliceFilter(modelTargets.Spec.TargetList, func(t model.Target) bool { - return t.Ready - }) - s.log.Debugf("Creating targets for target group %s", modelTg.Status.Id) - lattice := s.cloud.Lattice() - listTargetsInput := vpclattice.ListTargetsInput{ - TargetGroupIdentifier: &modelTg.Status.Id, - } - listTargetsOutput, err := lattice.ListTargetsAsList(ctx, &listTargetsInput) + latticeTargets, err := s.List(ctx, modelTg) if err != nil { return err } + staleTargets := s.findStaleTargets(modelTargets, latticeTargets) - err1 := s.deregisterStaleTargets(ctx, modelTargets, modelTg, listTargetsOutput) - err2 := s.registerTargets(ctx, modelTargets, modelTg) + err1 := s.deregisterTargets(ctx, modelTg, staleTargets) + err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList) return errors.Join(err1, err2) } +func (s *defaultTargetsManager) findStaleTargets( + modelTargets *model.Targets, + listTargetsOutput []*vpclattice.TargetSummary) []model.Target { + + // Disregard readiness information, and use IP/Port as key. + modelSet := utils.NewSet[model.Target]() + for _, target := range modelTargets.Spec.TargetList { + targetIpPort := model.Target{ + TargetIP: target.TargetIP, + Port: target.Port, + } + modelSet.Put(targetIpPort) + } + + staleTargets := make([]model.Target, 0) + for _, target := range listTargetsOutput { + ipPort := model.Target{ + TargetIP: aws.StringValue(target.Id), + Port: aws.Int64Value(target.Port), + } + if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) { + staleTargets = append(staleTargets, ipPort) + } + } + return staleTargets +} + func (s *defaultTargetsManager) registerTargets( ctx context.Context, - modelTargets *model.Targets, modelTg *model.TargetGroup, + targets []model.Target, ) error { - latticeTargets := utils.SliceMap(modelTargets.Spec.TargetList, func(t model.Target) *vpclattice.Target { + if len(targets) == 0 { + return nil + } + latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target { return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port} }) chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall) var registerTargetsError error - for i, targets := range chunks { - registerRouteInput := vpclattice.RegisterTargetsInput{ + for i, chunk := range chunks { + registerTargetsInput := vpclattice.RegisterTargetsInput{ TargetGroupIdentifier: &modelTg.Status.Id, - Targets: targets, + Targets: chunk, } - resp, err := s.cloud.Lattice().RegisterTargetsWithContext(ctx, ®isterRouteInput) + resp, err := s.cloud.Lattice().RegisterTargetsWithContext(ctx, ®isterTargetsInput) if err != nil { registerTargetsError = errors.Join(registerTargetsError, fmt.Errorf("Failed to register targets from VPC Lattice Target Group %s due to %s", modelTg.Status.Id, err)) } @@ -101,32 +131,24 @@ func (s *defaultTargetsManager) registerTargets( return registerTargetsError } -func (s *defaultTargetsManager) deregisterStaleTargets( +func (s *defaultTargetsManager) deregisterTargets( ctx context.Context, - modelTargets *model.Targets, modelTg *model.TargetGroup, - listTargetsOutput []*vpclattice.TargetSummary, + targets []model.Target, ) error { - var targetsToDeregister []*vpclattice.Target - for _, latticeTarget := range listTargetsOutput { - isStale := true - for _, t := range modelTargets.Spec.TargetList { - if (aws.StringValue(latticeTarget.Id) == t.TargetIP) && (aws.Int64Value(latticeTarget.Port) == t.Port) { - isStale = false - break - } - } - if isStale { - targetsToDeregister = append(targetsToDeregister, &vpclattice.Target{Id: latticeTarget.Id, Port: latticeTarget.Port}) - } + if len(targets) == 0 { + return nil } + latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target { + return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port} + }) - chunks := utils.Chunks(targetsToDeregister, maxTargetsPerLatticeTargetsApiCall) + chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall) var deregisterTargetsError error - for i, targets := range chunks { + for i, chunk := range chunks { deregisterTargetsInput := vpclattice.DeregisterTargetsInput{ TargetGroupIdentifier: &modelTg.Status.Id, - Targets: targets, + Targets: chunk, } resp, err := s.cloud.Lattice().DeregisterTargetsWithContext(ctx, &deregisterTargetsInput) if err != nil { diff --git a/pkg/deploy/lattice/targets_manager_mock.go b/pkg/deploy/lattice/targets_manager_mock.go index 1421ecff..dc5b4f70 100644 --- a/pkg/deploy/lattice/targets_manager_mock.go +++ b/pkg/deploy/lattice/targets_manager_mock.go @@ -9,6 +9,7 @@ import ( reflect "reflect" lattice "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + vpclattice "github.com/aws/aws-sdk-go/service/vpclattice" gomock "github.com/golang/mock/gomock" ) @@ -35,6 +36,21 @@ func (m *MockTargetsManager) EXPECT() *MockTargetsManagerMockRecorder { return m.recorder } +// List mocks base method. +func (m *MockTargetsManager) List(arg0 context.Context, arg1 *lattice.TargetGroup) ([]*vpclattice.TargetSummary, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List", arg0, arg1) + ret0, _ := ret[0].([]*vpclattice.TargetSummary) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// List indicates an expected call of List. +func (mr *MockTargetsManagerMockRecorder) List(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockTargetsManager)(nil).List), arg0, arg1) +} + // Update mocks base method. func (m *MockTargetsManager) Update(arg0 context.Context, arg1 *lattice.Targets, arg2 *lattice.TargetGroup) error { m.ctrl.T.Helper() diff --git a/pkg/deploy/lattice/targets_synthesizer.go b/pkg/deploy/lattice/targets_synthesizer.go index e51ea680..a686578a 100644 --- a/pkg/deploy/lattice/targets_synthesizer.go +++ b/pkg/deploy/lattice/targets_synthesizer.go @@ -4,21 +4,37 @@ import ( "context" "fmt" - pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/model/core" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + "github.com/aws/aws-application-networking-k8s/pkg/webhook" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/vpclattice" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + LatticeReadinessGateConditionType = webhook.PodReadinessGateConditionType + + ReadinessReasonHealthy = "Healthy" + ReadinessReasonUnhealthy = "Unhealthy" + ReadinessReasonUnused = "Unused" + ReadinessReasonInitial = "Initial" + ReadinessReasonHealthCheckUnavailable = "HealthCheckUnavailable" + ReadinessReasonTargetNotFound = "TargetNotFound" ) func NewTargetsSynthesizer( log gwlog.Logger, - cloud pkg_aws.Cloud, + client client.Client, tgManager TargetsManager, stack core.Stack, ) *targetsSynthesizer { return &targetsSynthesizer{ log: log, - cloud: cloud, + client: client, targetsManager: tgManager, stack: stack, } @@ -26,7 +42,7 @@ func NewTargetsSynthesizer( type targetsSynthesizer struct { log gwlog.Logger - cloud pkg_aws.Cloud + client client.Client targetsManager TargetsManager stack core.Stack } @@ -58,6 +74,119 @@ func (t *targetsSynthesizer) Synthesize(ctx context.Context) error { } func (t *targetsSynthesizer) PostSynthesize(ctx context.Context) error { - // nothing to do here + var resTargets []*model.Targets + err := t.stack.ListResources(&resTargets) + if err != nil { + t.log.Errorf("Failed to list targets due to %s", err) + } + + requeueNeeded := false + for _, targets := range resTargets { + tg := &model.TargetGroup{} + err := t.stack.GetResource(targets.Spec.StackTargetGroupId, tg) + if err != nil { + return err + } + + identifier := model.TgNamePrefix(tg.Spec) + if tg.Status != nil && tg.Status.Id != "" { + identifier = tg.Status.Id + } + + latticeTargets, err := t.targetsManager.List(ctx, tg) + if err != nil { + return fmt.Errorf("failed post-synthesize targets %s, ListTargets failure: %w", identifier, err) + } + + pending, err := t.syncStatus(ctx, targets.Spec.TargetList, latticeTargets) + if err != nil { + return fmt.Errorf("failed post-synthesize targets %s, condition sync failure: %w", identifier, err) + } + requeueNeeded = requeueNeeded || pending + } + + if requeueNeeded { + return fmt.Errorf("%w: target status still in pending", RetryErr) + } return nil } + +func (t *targetsSynthesizer) syncStatus(ctx context.Context, modelTargets []model.Target, latticeTargets []*vpclattice.TargetSummary) (bool, error) { + // Extract Lattice targets as a set + latticeTargetMap := make(map[model.Target]*vpclattice.TargetSummary) + + for _, latticeTarget := range latticeTargets { + ipPort := model.Target{ + TargetIP: aws.StringValue(latticeTarget.Id), + Port: aws.Int64Value(latticeTarget.Port), + } + latticeTargetMap[ipPort] = latticeTarget + } + + var requeue bool + for _, target := range modelTargets { + // Step 0: Check if the endpoint has a valid target, and is not ready yet. + if target.Ready || target.TargetRef.Name == "" { + continue + } + + // Step 1: Check if the pod has the readiness gate spec. + pod := &corev1.Pod{} + t.client.Get(ctx, target.TargetRef, pod) + if !utils.PodHasReadinessGate(pod, LatticeReadinessGateConditionType) { + continue + } + + // Step 2: Check if the pod readiness condition exists with specific condition type. + // The condition is considered false when it does not exist. + cond := utils.FindPodStatusCondition(pod.Status.Conditions, LatticeReadinessGateConditionType) + if cond != nil && cond.Status == corev1.ConditionTrue { + continue + } + + // Step 3: Check if the Lattice target is healthy. + newCond := corev1.PodCondition{ + Type: LatticeReadinessGateConditionType, + Status: corev1.ConditionFalse, + } + targetIpPort := model.Target{ + TargetIP: target.TargetIP, + Port: target.Port, + } + // syncStatus is called at post synthesis, so we can assume: + // 1. Target for the pod (eventually) exists. If the target doesn't exist, we can simply requeue. + // 2. Target group will be always in use, except for ServiceExport TGs. + if latticeTarget, ok := latticeTargetMap[targetIpPort]; ok { + switch status := aws.StringValue(latticeTarget.Status); status { + case vpclattice.TargetStatusHealthy: + newCond.Status = corev1.ConditionTrue + newCond.Reason = ReadinessReasonHealthy + case vpclattice.TargetStatusUnavailable: + // Lattice HC not turned on. Readiness is designed to work only with HC but do not block deployment on this case. + newCond.Status = corev1.ConditionTrue + newCond.Reason = ReadinessReasonHealthCheckUnavailable + case vpclattice.TargetStatusUnused: + // Since this logic is called after HTTPRoute is wired, this only happens for ServiceExport TGs. + // In this case we do not have to evaluate them as Healthy, but we also do not have to requeue. + newCond.Reason = ReadinessReasonUnused + case vpclattice.TargetStatusInitial: + requeue = true + newCond.Reason = ReadinessReasonInitial + default: + requeue = true + newCond.Reason = ReadinessReasonUnhealthy + newCond.Message = fmt.Sprintf("Target health check status: %s", status) + } + } else { + requeue = true + newCond.Reason = ReadinessReasonTargetNotFound + } + + // Step 4: Update status. + utils.SetPodStatusCondition(&pod.Status.Conditions, newCond) + if err := t.client.Status().Update(ctx, pod); err != nil { + return requeue, err + } + } + return requeue, nil +} diff --git a/pkg/deploy/lattice/targets_synthesizer_test.go b/pkg/deploy/lattice/targets_synthesizer_test.go index b36a1842..dd08a9f9 100644 --- a/pkg/deploy/lattice/targets_synthesizer_test.go +++ b/pkg/deploy/lattice/targets_synthesizer_test.go @@ -4,9 +4,16 @@ import ( "context" "github.com/aws/aws-application-networking-k8s/pkg/model/core" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/vpclattice" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "testing" ) @@ -61,3 +68,199 @@ func Test_SynthesizeTargets(t *testing.T) { err := synthesizer.Synthesize(ctx) assert.Nil(t, err) } + +func Test_PostSynthesize_Conditions(t *testing.T) { + + newPod := func(namespace, name string, hasGate bool, ready bool) *corev1.Pod { + var readinessGates []corev1.PodReadinessGate + if hasGate { + readinessGates = append(readinessGates, corev1.PodReadinessGate{ + ConditionType: LatticeReadinessGateConditionType, + }) + } + condition := corev1.PodCondition{ + Type: LatticeReadinessGateConditionType, + Status: corev1.ConditionFalse, + Reason: ReadinessReasonUnhealthy, + } + if ready { + condition = corev1.PodCondition{ + Type: LatticeReadinessGateConditionType, + Status: corev1.ConditionTrue, + Reason: ReadinessReasonHealthy, + } + } + + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: corev1.PodSpec{ + ReadinessGates: readinessGates, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{condition}, + }, + } + } + newLatticeTarget := func(ip string, port int64, status string) *vpclattice.TargetSummary { + return &vpclattice.TargetSummary{ + Id: aws.String(ip), + Port: aws.Int64(port), + Status: aws.String(status), + } + } + + target := model.Target{ + TargetIP: "10.10.1.1", + Port: 8675, + Ready: false, + TargetRef: types.NamespacedName{Namespace: "ns", Name: "pod1"}, + } + + tests := []struct { + name string + model model.Target + lattice *vpclattice.TargetSummary + pod *corev1.Pod + expectedStatus corev1.ConditionStatus + expectedReason string + requeue bool + }{ + { + name: "Healthy targets make pod ready", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusHealthy), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionTrue, + expectedReason: ReadinessReasonHealthy, + requeue: false, + }, + { + name: "Unavailable targets make pod ready", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusUnavailable), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionTrue, + expectedReason: ReadinessReasonHealthCheckUnavailable, + requeue: false, + }, + { + name: "Initial targets do not make pod ready", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusInitial), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonInitial, + requeue: true, + }, + { + name: "Unhealthy targets do not make pod ready", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusUnhealthy), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonUnhealthy, + requeue: true, + }, + { + name: "Draining(unhealthy) targets do not make pod ready", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusDraining), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonUnhealthy, + requeue: true, + }, + { + name: "Requeues if target not found", + model: target, + lattice: newLatticeTarget("dummy", 8675, vpclattice.TargetStatusHealthy), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonTargetNotFound, + requeue: true, + }, + { + name: "Pod without gate does not change condition", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusHealthy), + pod: newPod("ns", "pod1", false, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonUnhealthy, + requeue: false, + }, + { + name: "Ready pods keep condition (even if target is unhealthy)", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusUnhealthy), + pod: newPod("ns", "pod1", true, true), + expectedStatus: corev1.ConditionTrue, + expectedReason: ReadinessReasonHealthy, + requeue: false, + }, + { + name: "Unused pods keep condition", + model: target, + lattice: newLatticeTarget("10.10.1.1", 8675, vpclattice.TargetStatusUnused), + pod: newPod("ns", "pod1", true, false), + expectedStatus: corev1.ConditionFalse, + expectedReason: ReadinessReasonUnused, + requeue: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + + mockTargetsManager := NewMockTargetsManager(c) + + stack := core.NewDefaultStack(core.StackID{Name: "foo", Namespace: "bar"}) + + modelTg := model.TargetGroup{ + ResourceMeta: core.NewResourceMeta(stack, "AWS:VPCServiceNetwork::TargetGroup", "tg-stack-id"), + Status: &model.TargetGroupStatus{ + Name: "tg-name", + Arn: "tg-arn", + Id: "tg-id", + }, + } + assert.NoError(t, stack.AddResource(&modelTg)) + + targetsSpec := model.TargetsSpec{ + StackTargetGroupId: modelTg.ID(), + TargetList: []model.Target{tt.model}, + } + model.NewTargets(stack, targetsSpec) + + mockTargetsManager.EXPECT().List(ctx, gomock.Any()).Return([]*vpclattice.TargetSummary{tt.lattice}, nil) + + k8sClient := testclient.NewClientBuilder().Build() + assert.NoError(t, k8sClient.Create(ctx, tt.pod)) + + synthesizer := NewTargetsSynthesizer(gwlog.FallbackLogger, k8sClient, mockTargetsManager, stack) + err := synthesizer.PostSynthesize(ctx) + + if tt.requeue { + assert.ErrorAs(t, err, &RetryErr) + } else { + assert.Nil(t, err) + } + + pod := &corev1.Pod{} + k8sClient.Get(ctx, types.NamespacedName{Namespace: "ns", Name: "pod1"}, pod) + cond := utils.FindPodStatusCondition(pod.Status.Conditions, LatticeReadinessGateConditionType) + assert.NotNil(t, cond) + + assert.Equal(t, tt.expectedReason, cond.Reason) + assert.Equal(t, tt.expectedStatus, cond.Status) + }) + } +} diff --git a/pkg/deploy/stack_deployer.go b/pkg/deploy/stack_deployer.go index b66d6309..241a6edb 100644 --- a/pkg/deploy/stack_deployer.go +++ b/pkg/deploy/stack_deployer.go @@ -184,7 +184,7 @@ func (gc *TgGc) cycle() { func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { targetGroupSynthesizer := lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sClient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack) - targetsSynthesizer := lattice.NewTargetsSynthesizer(d.log, d.cloud, d.targetsManager, stack) + targetsSynthesizer := lattice.NewTargetsSynthesizer(d.log, d.k8sClient, d.targetsManager, stack) serviceSynthesizer := lattice.NewServiceSynthesizer(d.log, d.latticeServiceManager, d.dnsEndpointManager, stack) listenerSynthesizer := lattice.NewListenerSynthesizer(d.log, d.listenerManager, stack) ruleSynthesizer := lattice.NewRuleSynthesizer(d.log, d.ruleManager, d.targetGroupManager, stack) @@ -225,6 +225,11 @@ func (d *latticeServiceStackDeployer) Deploy(ctx context.Context, stack core.Sta return fmt.Errorf("error during rule synthesis %w", err) } + // Handle pod status update for targets. + if err := targetsSynthesizer.PostSynthesize(ctx); err != nil { + return fmt.Errorf("error during target post synthesis %w", err) + } + //Handle targetGroup deletion request if err := targetGroupSynthesizer.SynthesizeDelete(ctx); err != nil { return fmt.Errorf("error during tg delete synthesis %w", err) @@ -261,9 +266,14 @@ func NewTargetGroupStackDeploy( } func (d *latticeTargetGroupStackDeployer) Deploy(ctx context.Context, stack core.Stack) error { + defer func() { + tgGc.lock.Unlock() + }() + tgGc.lock.Lock() + synthesizers := []ResourceSynthesizer{ lattice.NewTargetGroupSynthesizer(d.log, d.cloud, d.k8sclient, d.targetGroupManager, d.svcExportTgBuilder, d.svcBuilder, stack), - lattice.NewTargetsSynthesizer(d.log, d.cloud, lattice.NewTargetsManager(d.log, d.cloud), stack), + lattice.NewTargetsSynthesizer(d.log, d.k8sclient, lattice.NewTargetsManager(d.log, d.cloud), stack), } return deploy(ctx, stack, synthesizers) } diff --git a/pkg/gateway/model_build_targets.go b/pkg/gateway/model_build_targets.go index cb47fe49..187bbc85 100644 --- a/pkg/gateway/model_build_targets.go +++ b/pkg/gateway/model_build_targets.go @@ -186,11 +186,19 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co if _, ok := servicePortNames[aws.StringValue(port.Name)]; ok || skipMatch { for _, ep := range epSlice.Endpoints { for _, address := range ep.Addresses { - targetList = append(targetList, model.Target{ + // Do not model terminating endpoints so that they can deregister. + if aws.BoolValue(ep.Conditions.Terminating) { + continue + } + target := model.Target{ TargetIP: address, Port: int64(aws.Int32Value(port.Port)), Ready: aws.BoolValue(ep.Conditions.Ready), - }) + } + if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" { + target.TargetRef = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name} + } + targetList = append(targetList, target) } } } diff --git a/pkg/gateway/model_build_targets_test.go b/pkg/gateway/model_build_targets_test.go index 3d52dccd..07b86bc0 100644 --- a/pkg/gateway/model_build_targets_test.go +++ b/pkg/gateway/model_build_targets_test.go @@ -64,12 +64,34 @@ func Test_Targets(t *testing.T) { Conditions: discoveryv1.EndpointConditions{ Ready: aws.Bool(true), }, + TargetRef: &corev1.ObjectReference{ + Namespace: "ns1", + Name: "pod1", + Kind: "Pod", + }, }, { Addresses: []string{"10.10.2.2"}, Conditions: discoveryv1.EndpointConditions{ Ready: aws.Bool(false), }, + TargetRef: &corev1.ObjectReference{ + Namespace: "ns1", + Name: "pod2", + Kind: "Pod", + }, + }, + { + Addresses: []string{"10.10.3.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: aws.Bool(false), + Terminating: aws.Bool(true), + }, + TargetRef: &corev1.ObjectReference{ + Namespace: "ns1", + Name: "pod3", + Kind: "Pod", + }, }, }, }, @@ -85,14 +107,16 @@ func Test_Targets(t *testing.T) { wantErrIsNil: true, expectedTargetList: []model.Target{ { - TargetIP: "10.10.1.1", - Port: 8675, - Ready: true, + TargetIP: "10.10.1.1", + Port: 8675, + Ready: true, + TargetRef: types.NamespacedName{Namespace: "ns1", Name: "pod1"}, }, { - TargetIP: "10.10.2.2", - Port: 8675, - Ready: false, + TargetIP: "10.10.2.2", + Port: 8675, + Ready: false, + TargetRef: types.NamespacedName{Namespace: "ns1", Name: "pod2"}, }, }, }, diff --git a/pkg/model/lattice/targets.go b/pkg/model/lattice/targets.go index ae7dccc7..9d3b6981 100644 --- a/pkg/model/lattice/targets.go +++ b/pkg/model/lattice/targets.go @@ -2,6 +2,7 @@ package lattice import ( "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "k8s.io/apimachinery/pkg/types" ) type Targets struct { @@ -18,9 +19,10 @@ type TargetsSpec struct { } type Target struct { - TargetIP string `json:"targetip"` - Port int64 `json:"port"` - Ready bool `json:"ready"` + TargetIP string `json:"targetip"` + Port int64 `json:"port"` + Ready bool `json:"ready"` + TargetRef types.NamespacedName } func NewTargets(stack core.Stack, spec TargetsSpec) (*Targets, error) { diff --git a/pkg/utils/pod_condition.go b/pkg/utils/pod_condition.go new file mode 100644 index 00000000..4807e554 --- /dev/null +++ b/pkg/utils/pod_condition.go @@ -0,0 +1,56 @@ +package utils + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" +) + +func PodHasReadinessGate(pod *corev1.Pod, conditionType corev1.PodConditionType) bool { + if pod == nil { + return false + } + for _, gate := range pod.Spec.ReadinessGates { + if gate.ConditionType == conditionType { + return true + } + } + return false +} + +// Copied from: k8s.io/apimachinery/pkg/apis/meta +func FindPodStatusCondition(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) *corev1.PodCondition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } + } + return nil +} + +// Copied from: k8s.io/apimachinery/pkg/apis/meta +func SetPodStatusCondition(conditions *[]corev1.PodCondition, newCondition corev1.PodCondition) { + if conditions == nil { + return + } + existingCondition := FindPodStatusCondition(*conditions, newCondition.Type) + if existingCondition == nil { + if newCondition.LastTransitionTime.IsZero() { + newCondition.LastTransitionTime = metav1.NewTime(time.Now()) + } + *conditions = append(*conditions, newCondition) + return + } + + if existingCondition.Status != newCondition.Status { + existingCondition.Status = newCondition.Status + if !newCondition.LastTransitionTime.IsZero() { + existingCondition.LastTransitionTime = newCondition.LastTransitionTime + } else { + existingCondition.LastTransitionTime = metav1.NewTime(time.Now()) + } + } + + existingCondition.Reason = newCondition.Reason + existingCondition.Message = newCondition.Message +} diff --git a/test/pkg/test/framework.go b/test/pkg/test/framework.go index 602cfefd..a2d4c659 100644 --- a/test/pkg/test/framework.go +++ b/test/pkg/test/framework.go @@ -159,14 +159,14 @@ func (env *Framework) ExpectToBeClean(ctx context.Context) { tags[model.K8SServiceNamespaceKey] = aws.String(K8sNamespace) Eventually(func(g Gomega) { arns, err := env.TaggingClient.FindResourcesByTags(ctx, services.ResourceTypeService, tags) - env.Log.Infow("Expecting no services created by the controller", "found", arns) + env.Log.Infow("Expecting no services created by the controller", "found", arns, "err", err) g.Expect(err).To(BeNil()) g.Expect(arns).To(BeEmpty()) }).Should(Succeed()) Eventually(func(g Gomega) { arns, err := env.TaggingClient.FindResourcesByTags(ctx, services.ResourceTypeTargetGroup, tags) - env.Log.Infow("Expecting no target groups created by the controller", "found", arns) + env.Log.Infow("Expecting no target groups created by the controller", "found", arns, "err", err) g.Expect(err).To(BeNil()) g.Expect(arns).To(BeEmpty()) }).Should(Succeed()) diff --git a/test/pkg/test/nginxapp.go b/test/pkg/test/nginxapp.go index 9201f964..330aceac 100644 --- a/test/pkg/test/nginxapp.go +++ b/test/pkg/test/nginxapp.go @@ -1,6 +1,7 @@ package test import ( + "github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice" "github.com/samber/lo" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -18,6 +19,7 @@ type ElasticSearchOptions struct { TargetPort2 int MergeFromDeployment []*appsv1.Deployment MergeFromService []*v1.Service + ReadinessGate bool } func (env *Framework) NewNginxApp(options ElasticSearchOptions) (*appsv1.Deployment, *v1.Service) { @@ -33,6 +35,12 @@ func (env *Framework) NewNginxApp(options ElasticSearchOptions) (*appsv1.Deploym if options.TargetPort2 == 0 { options.TargetPort2 = 9114 } + var readinessGates []v1.PodReadinessGate + if options.ReadinessGate { + readinessGates = append(readinessGates, v1.PodReadinessGate{ + ConditionType: lattice.LatticeReadinessGateConditionType, + }) + } deployment := New(&appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: options.Namespace, @@ -53,6 +61,7 @@ func (env *Framework) NewNginxApp(options ElasticSearchOptions) (*appsv1.Deploym }, }, Spec: v1.PodSpec{ + ReadinessGates: readinessGates, Containers: []v1.Container{ { Name: options.Name, diff --git a/test/suites/integration/readiness_gate_test.go b/test/suites/integration/readiness_gate_test.go new file mode 100644 index 00000000..87ab1345 --- /dev/null +++ b/test/suites/integration/readiness_gate_test.go @@ -0,0 +1,66 @@ +package integration + +import ( + "github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/utils" + "github.com/aws/aws-application-networking-k8s/test/pkg/test" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +var _ = Describe("Pod Readiness Gate", Ordered, func() { + var ( + deployment *appsv1.Deployment + service *v1.Service + route *gwv1.HTTPRoute + ) + + BeforeAll(func() { + deployment, service = testFramework.NewNginxApp(test.ElasticSearchOptions{ + Name: "pod-test", + Namespace: k8snamespace, + ReadinessGate: true, + }) + route = testFramework.NewHttpRoute(testGateway, service, service.Kind) + testFramework.ExpectCreated(ctx, deployment, service, route) + }) + + It("updates condition when injected", func() { + Eventually(func(g Gomega) { + pods := testFramework.GetPodsByDeploymentName(deployment.Name, deployment.Namespace) + for _, pod := range pods { + cond := utils.FindPodStatusCondition(pod.Status.Conditions, lattice.LatticeReadinessGateConditionType) + g.Expect(cond).To(Not(BeNil())) + g.Expect(cond.Status).To(Equal(v1.ConditionTrue)) + } + }).Should(Succeed()) + }) + + It("updates condition when a new pod is added", func() { + testFramework.Get(ctx, k8s.NamespacedName(deployment), deployment) + deployment.Spec.Replicas = lo.ToPtr(int32(3)) + testFramework.ExpectUpdated(ctx, deployment) + + Eventually(func(g Gomega) { + pods := testFramework.GetPodsByDeploymentName(deployment.Name, deployment.Namespace) + for _, pod := range pods { + cond := utils.FindPodStatusCondition(pod.Status.Conditions, lattice.LatticeReadinessGateConditionType) + g.Expect(cond).To(Not(BeNil())) + g.Expect(cond.Status).To(Equal(v1.ConditionTrue)) + } + }).Should(Succeed()) + }) + + AfterAll(func() { + testFramework.ExpectDeletedThenNotFound(ctx, + deployment, + service, + route, + ) + }) +})