diff --git a/controllers/eventhandlers/endpoints.go b/controllers/eventhandlers/endpoints.go deleted file mode 100644 index 928db8ec..00000000 --- a/controllers/eventhandlers/endpoints.go +++ /dev/null @@ -1,82 +0,0 @@ -package eventhandlers - -import ( - "context" - "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -type enqueueRequestsForEndpointsEvent struct { - log gwlog.Logger - client client.Client -} - -func NewEnqueueRequestEndpointEvent(log gwlog.Logger, client client.Client) handler.EventHandler { - return &enqueueRequestsForEndpointsEvent{ - log: log, - client: client, - } -} - -func (h *enqueueRequestsForEndpointsEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - h.log.Info("Event: endpoint create") - - epNew := e.Object.(*corev1.Endpoints) - h.enqueueImpactedService(queue, epNew) -} - -func (h *enqueueRequestsForEndpointsEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { - h.log.Info("Event: endpoints update") - epOld := e.ObjectOld.(*corev1.Endpoints) - epNew := e.ObjectNew.(*corev1.Endpoints) - if !equality.Semantic.DeepEqual(epOld.Subsets, epNew.Subsets) { - h.enqueueImpactedService(queue, epNew) - } -} - -func (h *enqueueRequestsForEndpointsEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - h.log.Infof("Event: endpoints delete") - // service event handler handles this event here -} - -func (h *enqueueRequestsForEndpointsEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestsForEndpointsEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, ep *corev1.Endpoints) { - h.log.Infof("Event: enqueueImpactedService for service name %s, namespace %s", ep.Name, ep.Namespace) - - var targetIPList []string - for _, endPoint := range ep.Subsets { - for _, address := range endPoint.Addresses { - targetIPList = append(targetIPList, address.IP) - } - } - - svc := &corev1.Service{} - namespaceName := types.NamespacedName{ - Namespace: ep.Namespace, - Name: ep.Name, - } - - if err := h.client.Get(context.TODO(), namespaceName, svc); err != nil { - h.log.Infof("Event: enqueueImpactedService, service not found %v\n", err) - return - } - - queue.Add(reconcile.Request{ - NamespacedName: namespaceName, - }) - - h.log.Infof("Finished enqueueImpactedService for service name %s, namespace %s targetIPLIST[%v]", - ep.Name, ep.Namespace, targetIPList) -} diff --git a/controllers/eventhandlers/mapper.go b/controllers/eventhandlers/mapper.go new file mode 100644 index 00000000..0473fd7a --- /dev/null +++ b/controllers/eventhandlers/mapper.go @@ -0,0 +1,161 @@ +package eventhandlers + +import ( + "context" + "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1" + mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +type resourceMapper struct { + log gwlog.Logger + client client.Client +} + +const ( + coreGroupName = "" // empty means core by definition + serviceKind = "Service" + serviceImportKind = "ServiceImport" +) + +func (r *resourceMapper) ServiceToRoutes(ctx context.Context, svc *corev1.Service, routeType core.RouteType) []core.Route { + if svc == nil { + return nil + } + return r.backendRefToRoutes(ctx, svc, coreGroupName, serviceKind, routeType) +} + +func (r *resourceMapper) ServiceImportToRoutes(ctx context.Context, svc *mcs_api.ServiceImport, routeType core.RouteType) []core.Route { + if svc == nil { + return nil + } + return r.backendRefToRoutes(ctx, svc, mcs_api.GroupName, serviceImportKind, routeType) +} + +func (r *resourceMapper) ServiceToServiceExport(ctx context.Context, svc *corev1.Service) *mcs_api.ServiceExport { + if svc == nil { + return nil + } + svcExport := &mcs_api.ServiceExport{} + if err := r.client.Get(ctx, k8s.NamespacedName(svc), svcExport); err != nil { + return nil + } + return svcExport +} + +func (r *resourceMapper) EndpointsToService(ctx context.Context, ep *corev1.Endpoints) *corev1.Service { + if ep == nil { + return nil + } + svc := &corev1.Service{} + if err := r.client.Get(ctx, k8s.NamespacedName(ep), svc); err != nil { + return nil + } + return svc +} + +func (r *resourceMapper) TargetGroupPolicyToService(ctx context.Context, tgp *v1alpha1.TargetGroupPolicy) *corev1.Service { + if tgp == nil { + return nil + } + policyName := k8s.NamespacedName(tgp).String() + + targetRef := tgp.Spec.TargetRef + if targetRef == nil { + r.log.Infow("TargetGroupPolicy does not have targetRef, skipping", + "policyName", policyName) + return nil + } + if targetRef.Group != coreGroupName || targetRef.Kind != serviceKind { + r.log.Infow("Detected non-Service TargetGroupPolicy attachment, skipping", + "policyName", policyName, "targetRef", targetRef) + return nil + } + namespace := tgp.Namespace + if targetRef.Namespace != nil && namespace != string(*targetRef.Namespace) { + r.log.Infow("Detected cross namespace TargetGroupPolicy attachment, skipping", + "policyName", policyName, "targetRef", targetRef) + return nil + } + + svcName := types.NamespacedName{ + Namespace: namespace, + Name: string(targetRef.Name), + } + svc := &corev1.Service{} + if err := r.client.Get(ctx, svcName, svc); err != nil { + if errors.IsNotFound(err) { + r.log.Debugw("TargetGroupPolicy is referring to non-existent service, skipping", + "policyName", policyName, "serviceName", svcName.String()) + } else { + // Still gracefully skipping the event but errors other than NotFound are bad sign. + r.log.Errorw("Failed to query targetRef of TargetGroupPolicy", + "policyName", policyName, "serviceName", svcName.String(), "reason", err.Error()) + } + return nil + } + r.log.Debugw("TargetGroupPolicy change on Service detected", + "policyName", policyName, "serviceName", svcName.String()) + + return svc +} + +func (r *resourceMapper) backendRefToRoutes(ctx context.Context, obj client.Object, group, kind string, routeType core.RouteType) []core.Route { + if obj == nil { + return nil + } + var routes []core.Route + switch routeType { + case core.HttpRouteType: + routeList := &gateway_api.HTTPRouteList{} + r.client.List(ctx, routeList) + for _, k8sRoute := range routeList.Items { + routes = append(routes, core.NewHTTPRoute(k8sRoute)) + } + case core.GrpcRouteType: + routeList := &gateway_api_v1alpha2.GRPCRouteList{} + r.client.List(ctx, routeList) + for _, k8sRoute := range routeList.Items { + routes = append(routes, core.NewGRPCRoute(k8sRoute)) + } + default: + return nil + } + + var filteredRoutes []core.Route + for _, route := range routes { + if r.isBackendRefUsedByRoute(route, obj, group, kind) { + filteredRoutes = append(filteredRoutes, route) + } + } + return filteredRoutes +} + +func (r *resourceMapper) isBackendRefUsedByRoute(route core.Route, obj k8s.NamespacedAndNamed, group, kind string) bool { + for _, rule := range route.Spec().Rules() { + for _, backendRef := range rule.BackendRefs() { + isGroupEqual := backendRef.Group() != nil && string(*backendRef.Group()) == group + isKindEqual := backendRef.Kind() != nil && string(*backendRef.Kind()) == kind + isNameEqual := string(backendRef.Name()) == obj.GetName() + + namespace := route.Namespace() + if backendRef.Namespace() != nil { + namespace = string(*backendRef.Namespace()) + } + isNamespaceEqual := namespace == obj.GetNamespace() + + if isGroupEqual && isKindEqual && isNameEqual && isNamespaceEqual { + return true + } + } + } + return false +} diff --git a/controllers/eventhandlers/targetgrouppolicy_test.go b/controllers/eventhandlers/mapper_test.go similarity index 68% rename from controllers/eventhandlers/targetgrouppolicy_test.go rename to controllers/eventhandlers/mapper_test.go index d119ed14..d1604787 100644 --- a/controllers/eventhandlers/targetgrouppolicy_test.go +++ b/controllers/eventhandlers/mapper_test.go @@ -5,98 +5,18 @@ import ( "errors" mock_client "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client" "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1" - mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "testing" ) -func TestGetTargetRef(t *testing.T) { - c := gomock.NewController(t) - defer c.Finish() - - ns1 := "default" - ns2 := "non-default" - - testCases := []struct { - namespace string - targetKind gateway_api.Kind - targetNamespace *gateway_api.Namespace - serviceFound bool - success bool - }{ - { - namespace: ns1, - targetKind: "Service", - targetNamespace: (*gateway_api.Namespace)(&ns2), - success: false, - }, - { - namespace: ns1, - targetKind: "NotService", - targetNamespace: (*gateway_api.Namespace)(&ns1), - success: false, - }, - { - namespace: ns1, - targetKind: "Service", - targetNamespace: (*gateway_api.Namespace)(&ns1), - serviceFound: false, - success: false, - }, - { - namespace: ns1, - targetKind: "Service", - targetNamespace: (*gateway_api.Namespace)(&ns1), - serviceFound: true, - success: true, - }, - { - namespace: ns1, - targetKind: "Service", - targetNamespace: nil, - serviceFound: true, - success: true, - }, - } - - for _, tt := range testCases { - mockClient := mock_client.NewMockClient(c) - h := NewTargetGroupPolicyEventHandler(gwlog.NewLogger(true), mockClient) - if tt.serviceFound { - mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - } else { - mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("fail")).AnyTimes() - } - svc := h.getTargetRef(&v1alpha1.TargetGroupPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-policy", - Namespace: tt.namespace, - }, - Spec: v1alpha1.TargetGroupPolicySpec{ - TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ - Group: "", - Kind: tt.targetKind, - Name: "test-service", - Namespace: tt.targetNamespace, - }, - }, - }) - if tt.success { - assert.NotNil(t, svc) - } else { - assert.Nil(t, svc) - } - } -} - func createHTTPRoute(name, namespace string, backendRef gateway_api.BackendObjectReference) gateway_api.HTTPRoute { return gateway_api.HTTPRoute{ ObjectMeta: metav1.ObjectMeta{ @@ -119,7 +39,7 @@ func createHTTPRoute(name, namespace string, backendRef gateway_api.BackendObjec } } -func TestMapToHTTPRoute(t *testing.T) { +func TestServiceToRoutes(t *testing.T) { c := gomock.NewController(t) defer c.Finish() @@ -133,12 +53,25 @@ func TestMapToHTTPRoute(t *testing.T) { Namespace: nil, Name: "test-service", }), + createHTTPRoute("invalid-nil-group", "ns1", gateway_api.BackendObjectReference{ + Kind: (*gateway_api.Kind)(pointer.String("Service")), + Namespace: nil, + Name: "test-service", + }), + createHTTPRoute("invalid-group", "ns1", gateway_api.BackendObjectReference{ + Group: (*gateway_api.Group)(pointer.String("not-core")), + Kind: (*gateway_api.Kind)(pointer.String("Service")), + Namespace: nil, + Name: "test-service", + }), createHTTPRoute("valid-inferred-namespace", "ns1", gateway_api.BackendObjectReference{ + Group: (*gateway_api.Group)(pointer.String("")), Kind: (*gateway_api.Kind)(pointer.String("Service")), Namespace: nil, Name: "test-service", }), createHTTPRoute("valid-explicit-namespace", "ns1", gateway_api.BackendObjectReference{ + Group: (*gateway_api.Group)(pointer.String("")), Kind: (*gateway_api.Kind)(pointer.String("Service")), Namespace: (*gateway_api.Namespace)(pointer.String("ns1")), Name: "test-service", @@ -160,14 +93,6 @@ func TestMapToHTTPRoute(t *testing.T) { } mockClient := mock_client.NewMockClient(c) - h := NewTargetGroupPolicyEventHandler(gwlog.NewLogger(true), mockClient) - mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, name types.NamespacedName, svc *corev1.Service, _ ...interface{}) error { - svc.SetName("test-service") - svc.SetNamespace("ns1") - return nil - }, - ) mockClient.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, routeList *gateway_api.HTTPRouteList, _ ...interface{}) error { for _, route := range routes { @@ -176,59 +101,96 @@ func TestMapToHTTPRoute(t *testing.T) { return nil }, ) - reqs := h.MapToHTTPRoute(&v1alpha1.TargetGroupPolicy{ + + mapper := &resourceMapper{log: gwlog.FallbackLogger, client: mockClient} + res := mapper.ServiceToRoutes(context.Background(), &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-policy", + Name: "test-service", Namespace: "ns1", }, - Spec: v1alpha1.TargetGroupPolicySpec{ - TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ - Group: "", - Kind: "Service", - Name: "test-service", - }, - }, - }) - assert.Len(t, reqs, len(validRoutes)) - for i, req := range reqs { - assert.Equal(t, validRoutes[i], req.Name) + }, core.HttpRouteType) + + assert.Len(t, res, len(validRoutes)) + for i, r := range res { + assert.Equal(t, validRoutes[i], r.Name()) } } -func TestMapToServiceExport(t *testing.T) { +func TestTargetGroupPolicyToService(t *testing.T) { c := gomock.NewController(t) defer c.Finish() - mockClient := mock_client.NewMockClient(c) - h := NewTargetGroupPolicyEventHandler(gwlog.NewLogger(true), mockClient) - mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, name types.NamespacedName, svc *corev1.Service, _ ...interface{}) error { - svc.SetName("test-service") - svc.SetNamespace("ns1") - return nil + ns1 := "default" + ns2 := "non-default" + + testCases := []struct { + namespace string + targetKind gateway_api.Kind + targetNamespace *gateway_api.Namespace + serviceFound bool + success bool + }{ + { + namespace: ns1, + targetKind: "Service", + targetNamespace: (*gateway_api.Namespace)(&ns2), + success: false, }, - ) - mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, name types.NamespacedName, svc *mcs_api.ServiceExport, _ ...interface{}) error { - svc.SetName("test-service") - svc.SetNamespace("ns1") - return nil + { + namespace: ns1, + targetKind: "NotService", + targetNamespace: (*gateway_api.Namespace)(&ns1), + success: false, }, - ) - - reqs := h.MapToServiceExport(&v1alpha1.TargetGroupPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-policy", - Namespace: "ns1", + { + namespace: ns1, + targetKind: "Service", + targetNamespace: (*gateway_api.Namespace)(&ns1), + serviceFound: false, + success: false, }, - Spec: v1alpha1.TargetGroupPolicySpec{ - TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ - Group: "", - Kind: "Service", - Name: "test-service", - }, + { + namespace: ns1, + targetKind: "Service", + targetNamespace: (*gateway_api.Namespace)(&ns1), + serviceFound: true, + success: true, }, - }) - assert.Len(t, reqs, 1) - assert.Equal(t, "test-service", reqs[0].Name) + { + namespace: ns1, + targetKind: "Service", + targetNamespace: nil, + serviceFound: true, + success: true, + }, + } + + for _, tt := range testCases { + mockClient := mock_client.NewMockClient(c) + mapper := &resourceMapper{log: gwlog.FallbackLogger, client: mockClient} + if tt.serviceFound { + mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + } else { + mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("fail")).AnyTimes() + } + svc := mapper.TargetGroupPolicyToService(context.Background(), &v1alpha1.TargetGroupPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-policy", + Namespace: tt.namespace, + }, + Spec: v1alpha1.TargetGroupPolicySpec{ + TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ + Group: "", + Kind: tt.targetKind, + Name: "test-service", + Namespace: tt.targetNamespace, + }, + }, + }) + if tt.success { + assert.NotNil(t, svc) + } else { + assert.Nil(t, svc) + } + } } diff --git a/controllers/eventhandlers/route.go b/controllers/eventhandlers/route.go deleted file mode 100644 index 990e9f30..00000000 --- a/controllers/eventhandlers/route.go +++ /dev/null @@ -1,100 +0,0 @@ -package eventhandlers - -import ( - "context" - "github.com/aws/aws-application-networking-k8s/pkg/model/core" - "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -type enqueueRequestsForRouteEvent struct { - log gwlog.Logger - client client.Client -} - -func NewEnqueueRequestRouteEvent(log gwlog.Logger, client client.Client) handler.EventHandler { - return &enqueueRequestsForRouteEvent{ - log: log, - client: client, - } -} - -func (h *enqueueRequestsForRouteEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - h.log.Infof("Received CreateRoute event for %s", e.Object.GetName()) - h.log.Debugf("CreateRoute event %+v", e) - - newRoute, err := core.NewRoute(e.Object) - if err != nil { - h.log.Errorf("Error while reading route in CreateRouteEvent %s", err) - } - - h.enqueueImpactedService(queue, newRoute) -} - -func (h *enqueueRequestsForRouteEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { - h.log.Infof("Received UpdateRoute event for %s", e.ObjectOld.GetName()) - h.log.Debugf("UpdateRoute event %+v", e) - - oldRoute, err := core.NewRoute(e.ObjectOld) - if err != nil { - h.log.Errorf("Error while reading old route in UpdateRouteEvent %s", err) - } - - newRoute, err := core.NewRoute(e.ObjectNew) - if err != nil { - h.log.Errorf("Error while reading new route UpdateRouteEvent %s", err) - } - - if !oldRoute.Spec().Equals(newRoute.Spec()) { - h.log.Debugf("New and old route are different. Old: %+v, New: %+v", oldRoute.Spec(), newRoute.Spec()) - parents := newRoute.Status().Parents() - if parents != nil && parents[0].Conditions[0].LastTransitionTime != ZeroTransitionTime { - h.log.Info("Update Gateway Event, reset LastTransitionTime for gateway") - parents[0].Conditions[0].LastTransitionTime = ZeroTransitionTime - - } - h.enqueueImpactedService(queue, newRoute) - } -} - -func (h *enqueueRequestsForRouteEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - // TODO -} - -func (h *enqueueRequestsForRouteEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestsForRouteEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, route core.Route) { - h.log.Infof("enqueueImpactedService [%v]", route) - - for _, rule := range route.Spec().Rules() { - for _, backendRef := range rule.BackendRefs() { - // TODOif backendRef.Kind == "service" { - namespaceName := types.NamespacedName{ - Namespace: route.Namespace(), - Name: string(backendRef.Name()), - } - - svc := &corev1.Service{} - if err := h.client.Get(context.TODO(), namespaceName, svc); err != nil { - h.log.Infof("Unknown service %+v", namespaceName) - continue - } - - h.log.Infof("Adding to service %+v to queue", namespaceName) - queue.Add(reconcile.Request{ - NamespacedName: namespaceName, - }) - - //} - } - } -} diff --git a/controllers/eventhandlers/service.go b/controllers/eventhandlers/service.go index ef14f0ee..989090ed 100644 --- a/controllers/eventhandlers/service.go +++ b/controllers/eventhandlers/service.go @@ -2,169 +2,78 @@ package eventhandlers import ( "context" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - + "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" "github.com/aws/aws-application-networking-k8s/pkg/model/core" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/event" - mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -type enqueueRequestForServiceWithExportEvent struct { +type serviceEventHandler struct { log gwlog.Logger client client.Client + mapper *resourceMapper } -func NewEqueueRequestServiceWithExportEvent(log gwlog.Logger, client client.Client) handler.EventHandler { - return &enqueueRequestForServiceWithExportEvent{ - log: log, - client: client, - } -} - -func (h *enqueueRequestForServiceWithExportEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - h.log.Info("Event: service create") - service := e.Object.(*corev1.Service) - h.enqueueImpactedService(queue, service) - h.enqueueImpactedServiceExport(queue, service) -} - -func (h *enqueueRequestForServiceWithExportEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { -} - -func (h *enqueueRequestForServiceWithExportEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - h.log.Info("Event: service delete") - service := e.Object.(*corev1.Service) - h.enqueueImpactedService(queue, service) - h.enqueueImpactedServiceExport(queue, service) +func NewServiceEventHandler(log gwlog.Logger, client client.Client) *serviceEventHandler { + return &serviceEventHandler{log: log, client: client, + mapper: &resourceMapper{log: log, client: client}} } -func (h *enqueueRequestForServiceWithExportEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestForServiceWithExportEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, ep *corev1.Service) { - h.log.Infof("Event: enqueueImpactedService: service name: %s, service namespace: %s", ep.Name, ep.Namespace) - - srv := &corev1.Service{} - namespacedName := types.NamespacedName{ - Namespace: ep.Namespace, - Name: ep.Name, - } - - if err := h.client.Get(context.TODO(), namespacedName, srv); err != nil { - h.log.Infof("Event: enqueueImpactedService, service not found %v\n", err) - return - } - - queue.Add(reconcile.Request{ - NamespacedName: namespacedName, +func (h *serviceEventHandler) MapToRoute(routeType core.RouteType) handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request { + return h.mapToRoute(obj, routeType) }) - } -func (h *enqueueRequestForServiceWithExportEvent) enqueueImpactedServiceExport(queue workqueue.RateLimitingInterface, ep *corev1.Service) { - h.log.Infof("Event: enqueueImpactedServiceExport: service name %s, service namespace %s", ep.Name, ep.Namespace) - - srvExport := &mcs_api.ServiceExport{} - namespacedName := types.NamespacedName{ - Namespace: ep.Namespace, - Name: ep.Name, - } - - if err := h.client.Get(context.TODO(), namespacedName, srvExport); err != nil { - h.log.Infof("Event: enqueueImpactedServiceExport, serviceexport not found %v\n", err) - return - } - - queue.Add(reconcile.Request{ - NamespacedName: namespacedName, +func (h *serviceEventHandler) MapToServiceExport() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request { + return h.mapToServiceExport(obj) }) } -type enqueueRequestForServiceWithRoutesEvent struct { - log gwlog.Logger - client client.Client -} +func (h *serviceEventHandler) mapToServiceExport(obj client.Object) []reconcile.Request { + var requests []reconcile.Request -func NewEnqueueRequestForServiceWithRoutesEvent(log gwlog.Logger, client client.Client) handler.EventHandler { - return &enqueueRequestForServiceWithRoutesEvent{ - log: log, - client: client, + ctx := context.Background() + svc := h.mapToService(ctx, obj) + svcExport := h.mapper.ServiceToServiceExport(ctx, svc) + if svcExport != nil { + requests = append(requests, reconcile.Request{ + NamespacedName: k8s.NamespacedName(svcExport), + }) + h.log.Infow("Service impacting resource change triggered ServiceExport update", + "serviceName", svc.Namespace+"/"+svc.Name) } + return requests } -func (h *enqueueRequestForServiceWithRoutesEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - service := e.Object.(*corev1.Service) - h.enqueueImpactedRoutes(queue, service) -} - -func (h *enqueueRequestForServiceWithRoutesEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { -} - -func (h *enqueueRequestForServiceWithRoutesEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - service := e.Object.(*corev1.Service) - h.enqueueImpactedRoutes(queue, service) -} - -func (h *enqueueRequestForServiceWithRoutesEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestForServiceWithRoutesEvent) enqueueImpactedRoutes(queue workqueue.RateLimitingInterface, ep *corev1.Service) { - h.log.Infof("Event: enqueueImpactedRoutes for service name %s, namespace %s", ep.Name, ep.Namespace) - - routes, err := core.ListAllRoutes(context.TODO(), h.client) - if err != nil { - h.log.Errorf("Failed to list all routes, %s", err) - return - } - for _, route := range routes { - if !isServiceUsedByRoute(route, ep) { - continue - } - h.log.Infof("Event: enqueueImpactedRoutes --> route %v", route) - namespacedName := types.NamespacedName{ - Namespace: route.Namespace(), - Name: route.Name(), - } - queue.Add(reconcile.Request{ - NamespacedName: namespacedName, - }) +func (h *serviceEventHandler) mapToService(ctx context.Context, obj client.Object) *corev1.Service { + switch typed := obj.(type) { + case *corev1.Service: + return typed + case *v1alpha1.TargetGroupPolicy: + return h.mapper.TargetGroupPolicyToService(ctx, typed) + case *corev1.Endpoints: + return h.mapper.EndpointsToService(ctx, typed) } + return nil } -func isServiceUsedByRoute(route core.Route, ep *corev1.Service) bool { - for _, rule := range route.Spec().Rules() { - for _, backendRef := range rule.BackendRefs() { - if string(*backendRef.Kind()) != "service" { - continue - } - - if string(backendRef.Name()) != ep.Name { - continue - } - - namespace := route.Namespace() - if backendRef.Namespace() != nil { - namespace = string(*backendRef.Namespace()) - } +func (h *serviceEventHandler) mapToRoute(obj client.Object, routeType core.RouteType) []reconcile.Request { + ctx := context.Background() + svc := h.mapToService(ctx, obj) + routes := h.mapper.ServiceToRoutes(ctx, svc, routeType) - if namespace != ep.Namespace { - continue - } - - return true - } + var requests []reconcile.Request + for _, route := range routes { + routeName := k8s.NamespacedName(route.K8sObject()) + requests = append(requests, reconcile.Request{NamespacedName: routeName}) + h.log.Infow("Service impacting resource change triggered Route update", + "serviceName", svc.Namespace+"/"+svc.Name, "routeName", routeName, "routeType", routeType) } - - return false + return requests } diff --git a/controllers/eventhandlers/service_test.go b/controllers/eventhandlers/service_test.go new file mode 100644 index 00000000..dc031daf --- /dev/null +++ b/controllers/eventhandlers/service_test.go @@ -0,0 +1,131 @@ +package eventhandlers + +import ( + "context" + mock_client "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client" + "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1" + "testing" +) + +func TestServiceEventHandler_MapToRoute(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + + routes := []gateway_api.HTTPRoute{ + createHTTPRoute("valid-route", "ns1", gateway_api.BackendObjectReference{ + Group: (*gateway_api.Group)(pointer.String("")), + Kind: (*gateway_api.Kind)(pointer.String("Service")), + Namespace: (*gateway_api.Namespace)(pointer.String("ns1")), + Name: "test-service", + }), + } + mockClient := mock_client.NewMockClient(c) + h := NewServiceEventHandler(gwlog.FallbackLogger, mockClient) + mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, name types.NamespacedName, svc client.Object, _ ...interface{}) error { + svc.SetName("test-service") + svc.SetNamespace("ns1") + return nil + }, + ).AnyTimes() + mockClient.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, routeList *gateway_api.HTTPRouteList, _ ...interface{}) error { + for _, route := range routes { + routeList.Items = append(routeList.Items, route) + } + return nil + }, + ).AnyTimes() + + objs := []client.Object{ + &v1alpha1.TargetGroupPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-policy", + Namespace: "ns1", + }, + Spec: v1alpha1.TargetGroupPolicySpec{ + TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ + Group: "", + Kind: "Service", + Name: "test-service", + }, + }, + }, + &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "ns1", + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "ns1", + }, + }, + } + for _, obj := range objs { + reqs := h.mapToRoute(obj, core.HttpRouteType) + assert.Len(t, reqs, 1) + assert.Equal(t, "valid-route", reqs[0].Name) + } +} + +func TestServiceEventHandler_MapToServiceExport(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + + mockClient := mock_client.NewMockClient(c) + h := NewServiceEventHandler(gwlog.FallbackLogger, mockClient) + mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, name types.NamespacedName, svcOrSvcExport client.Object, _ ...interface{}) error { + svcOrSvcExport.SetName("test-service") + svcOrSvcExport.SetNamespace("ns1") + return nil + }, + ).AnyTimes() + + objs := []client.Object{ + &v1alpha1.TargetGroupPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-policy", + Namespace: "ns1", + }, + Spec: v1alpha1.TargetGroupPolicySpec{ + TargetRef: &gateway_api_v1alpha2.PolicyTargetReference{ + Group: "", + Kind: "Service", + Name: "test-service", + }, + }, + }, + &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "ns1", + }, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "ns1", + }, + }, + } + for _, obj := range objs { + reqs := h.mapToServiceExport(obj) + assert.Len(t, reqs, 1) + assert.Equal(t, "test-service", reqs[0].Name) + } +} diff --git a/controllers/eventhandlers/serviceexport.go b/controllers/eventhandlers/serviceexport.go deleted file mode 100644 index 7c29657a..00000000 --- a/controllers/eventhandlers/serviceexport.go +++ /dev/null @@ -1,70 +0,0 @@ -package eventhandlers - -import ( - "context" - "github.com/golang/glog" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/types" - - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/event" - mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" -) - -type enqueueRequestsForServiceExportEvent struct { - client client.Client -} - -func NewEqueueRequestServiceExportEvent(client client.Client) handler.EventHandler { - return &enqueueRequestsForServiceExportEvent{ - client: client, - } -} - -func (h *enqueueRequestsForServiceExportEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - newServiceExport := e.Object.(*mcs_api.ServiceExport) - h.enqueueImpactedService(queue, newServiceExport) -} - -func (h *enqueueRequestsForServiceExportEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { - oldServiceExport := e.ObjectOld.(*mcs_api.ServiceExport) - newServiceExport := e.ObjectNew.(*mcs_api.ServiceExport) - - if !equality.Semantic.DeepEqual(oldServiceExport, newServiceExport) { - h.enqueueImpactedService(queue, newServiceExport) - } -} - -func (h *enqueueRequestsForServiceExportEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - glog.V(6).Infof("TODO serviceExport Delete \n") -} - -func (h *enqueueRequestsForServiceExportEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestsForServiceExportEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, serviceExport *mcs_api.ServiceExport) { - glog.V(6).Infof("enqueueImpactedService, serviceExport[%v]\n", serviceExport) - - namespaceName := types.NamespacedName{ - Namespace: serviceExport.Namespace, - Name: serviceExport.Name, - } - - svc := &corev1.Service{} - - if err := h.client.Get(context.TODO(), namespaceName, svc); err != nil { - glog.V(6).Infof("enqueueImpactedService, unknown service [%v]\n", namespaceName) - return - } - - queue.Add(reconcile.Request{ - NamespacedName: namespaceName, - }) -} diff --git a/controllers/eventhandlers/serviceimport.go b/controllers/eventhandlers/serviceimport.go index be65139b..155e407d 100644 --- a/controllers/eventhandlers/serviceimport.go +++ b/controllers/eventhandlers/serviceimport.go @@ -3,9 +3,6 @@ package eventhandlers import ( "context" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/types" - "github.com/aws/aws-application-networking-k8s/pkg/model/core" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" @@ -13,111 +10,40 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/event" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -type enqueueRequestsForServiceImportEvent struct { - log gwlog.Logger - client client.Client - routeType core.RouteType +type serviceImportEventHandler struct { + log gwlog.Logger + client client.Client + mapper *resourceMapper } -func NewEqueueRequestServiceImportEvent(log gwlog.Logger, client client.Client, routeType core.RouteType) handler.EventHandler { - return &enqueueRequestsForServiceImportEvent{ - log: log, - client: client, - routeType: routeType, +func NewServiceImportEventHandler(log gwlog.Logger, client client.Client) *serviceImportEventHandler { + return &serviceImportEventHandler{ + log: log, + client: client, + mapper: &resourceMapper{log: log, client: client}, } } -func (h *enqueueRequestsForServiceImportEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) { - newServiceImport := e.Object.(*mcs_api.ServiceImport) - h.enqueueImpactedService(queue, newServiceImport) +func (h *serviceImportEventHandler) MapToRoute(routeType core.RouteType) handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request { + return h.mapToRoute(obj, routeType) + }) } -func (h *enqueueRequestsForServiceImportEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) { - oldServiceImport := e.ObjectOld.(*mcs_api.ServiceImport) - newServiceImport := e.ObjectNew.(*mcs_api.ServiceImport) - - if !equality.Semantic.DeepEqual(oldServiceImport, newServiceImport) { - h.enqueueImpactedService(queue, newServiceImport) - } -} - -func (h *enqueueRequestsForServiceImportEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) { - // TODO - oldServiceImport := e.Object.(*mcs_api.ServiceImport) - h.enqueueImpactedService(queue, oldServiceImport) - -} - -func (h *enqueueRequestsForServiceImportEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) { - -} - -func (h *enqueueRequestsForServiceImportEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, serviceImport *mcs_api.ServiceImport) { - h.log.Infof("enqueueImpactedRoute, serviceImport[%v]", serviceImport) - - var routes []core.Route - var err error - - switch h.routeType { - case core.HttpRouteType: - routes, err = core.ListHTTPRoutes(context.TODO(), h.client) - case core.GrpcRouteType: - routes, err = core.ListGRPCRoutes(context.TODO(), h.client) - default: - h.log.Errorf("Invalid routeType %s", h.routeType) - return - } - - if err != nil { - h.log.Errorf("Error while listing routes, %s", err) - } +func (h *serviceImportEventHandler) mapToRoute(obj client.Object, routeType core.RouteType) []reconcile.Request { + ctx := context.Background() + routes := h.mapper.ServiceImportToRoutes(ctx, obj.(*mcs_api.ServiceImport), routeType) + var requests []reconcile.Request for _, route := range routes { - if !isServiceImportUsedByRoute(route, serviceImport) { - continue - } - - h.log.Infof("enqueueRequestsForServiceImportEvent: route name %s, namespace %s", - route.Name(), route.Namespace()) - - namespacedName := types.NamespacedName{ - Namespace: route.Namespace(), - Name: route.Name(), - } - - queue.Add(reconcile.Request{ - NamespacedName: namespacedName, - }) - } -} - -func isServiceImportUsedByRoute(route core.Route, serviceImport *mcs_api.ServiceImport) bool { - for _, rule := range route.Spec().Rules() { - for _, backendRef := range rule.BackendRefs() { - if string(*backendRef.Kind()) != "ServiceImport" { - continue - } - - if string(backendRef.Name()) != serviceImport.Name { - continue - } - - namespace := route.Namespace() - if backendRef.Namespace() != nil { - namespace = string(*backendRef.Namespace()) - } - - if namespace != serviceImport.Namespace { - continue - } - - return true - } + routeName := k8s.NamespacedName(route.K8sObject()) + requests = append(requests, reconcile.Request{NamespacedName: routeName}) + h.log.Infow("ServiceImport resource change triggered Route update", + "serviceName", obj.GetNamespace()+"/"+obj.GetName(), "routeName", routeName, "routeType", routeType) } - return false + return requests } diff --git a/controllers/eventhandlers/serviceimport_test.go b/controllers/eventhandlers/serviceimport_test.go new file mode 100644 index 00000000..1b1c5d9f --- /dev/null +++ b/controllers/eventhandlers/serviceimport_test.go @@ -0,0 +1,48 @@ +package eventhandlers + +import ( + "context" + mock_client "github.com/aws/aws-application-networking-k8s/mocks/controller-runtime/client" + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1" + mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + "testing" +) + +func TestServiceImportEventHandler_MapToRoute(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + + routes := []gateway_api.HTTPRoute{ + createHTTPRoute("valid-route", "ns1", gateway_api.BackendObjectReference{ + Group: (*gateway_api.Group)(pointer.String("multicluster.x-k8s.io")), + Kind: (*gateway_api.Kind)(pointer.String("ServiceImport")), + Namespace: (*gateway_api.Namespace)(pointer.String("ns1")), + Name: "test-service", + }), + } + mockClient := mock_client.NewMockClient(c) + h := NewServiceImportEventHandler(gwlog.FallbackLogger, mockClient) + mockClient.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, routeList *gateway_api.HTTPRouteList, _ ...interface{}) error { + for _, route := range routes { + routeList.Items = append(routeList.Items, route) + } + return nil + }, + ).AnyTimes() + + reqs := h.mapToRoute(&mcs_api.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "ns1", + }, + }, core.HttpRouteType) + assert.Len(t, reqs, 1) + assert.Equal(t, "valid-route", reqs[0].Name) +} diff --git a/controllers/eventhandlers/targetgrouppolicy.go b/controllers/eventhandlers/targetgrouppolicy.go deleted file mode 100644 index 32b152af..00000000 --- a/controllers/eventhandlers/targetgrouppolicy.go +++ /dev/null @@ -1,167 +0,0 @@ -package eventhandlers - -import ( - "context" - "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" - "github.com/aws/aws-application-networking-k8s/pkg/model/core" - "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" - gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1" - mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" -) - -type targetGroupPolicyEventHandler struct { - log gwlog.Logger - client client.Client -} - -func NewTargetGroupPolicyEventHandler(log gwlog.Logger, client client.Client) *targetGroupPolicyEventHandler { - return &targetGroupPolicyEventHandler{log: log, client: client} -} - -func (h *targetGroupPolicyEventHandler) getTargetRef(obj client.Object) *corev1.Service { - tgp := obj.(*v1alpha1.TargetGroupPolicy) - policyName := tgp.Namespace + "/" + tgp.Name - - targetRef := tgp.Spec.TargetRef - if targetRef == nil { - h.log.Infow("TargetGroupPolicy does not have targetRef, skipping", - "policyName", policyName) - return nil - } - if targetRef.Group != "" || targetRef.Kind != "Service" { - h.log.Infow("Detected non-Service TargetGroupPolicy attachment, skipping", - "policyName", policyName, "targetRef", targetRef) - return nil - } - namespace := tgp.Namespace - if targetRef.Namespace != nil && namespace != string(*targetRef.Namespace) { - h.log.Infow("Detected cross namespace TargetGroupPolicy attachment, skipping", - "policyName", policyName, "targetRef", targetRef) - return nil - } - - svcName := types.NamespacedName{ - Namespace: namespace, - Name: string(targetRef.Name), - } - svc := &corev1.Service{} - if err := h.client.Get(context.TODO(), svcName, svc); err != nil { - if errors.IsNotFound(err) { - h.log.Debugw("TargetGroupPolicy is referring to non-existent service, skipping", - "policyName", policyName, "serviceName", svcName.String()) - } else { - // Still gracefully skipping the event but errors other than NotFound are bad sign. - h.log.Errorw("Failed to query targetRef of TargetGroupPolicy", - "policyName", policyName, "serviceName", svcName.String(), "reason", err.Error()) - } - return nil - } - h.log.Debugw("TargetGroupPolicy change on Service detected", - "policyName", policyName, "serviceName", svcName.String()) - - return svc -} - -func (h *targetGroupPolicyEventHandler) MapToHTTPRoute(obj client.Object) []reconcile.Request { - var requests []reconcile.Request - - svc := h.getTargetRef(obj) - if svc == nil { - return nil - } - - routeList := &gateway_api.HTTPRouteList{} - h.client.List(context.TODO(), routeList) - - for _, httpRoute := range routeList.Items { - if isServiceUsedByRouteForPolicy(core.NewHTTPRoute(httpRoute), svc) { - routeName := types.NamespacedName{ - Namespace: httpRoute.Namespace, - Name: httpRoute.Name, - } - requests = append(requests, reconcile.Request{NamespacedName: routeName}) - h.log.Infow("Service TargetGroupPolicy change triggering HTTPRoute update", - "serviceName", svc.Namespace+"/"+svc.Name, "routeName", routeName.String()) - } - } - return requests -} - -func (h *targetGroupPolicyEventHandler) MapToGRPCRoute(obj client.Object) []reconcile.Request { - var requests []reconcile.Request - - svc := h.getTargetRef(obj) - if svc == nil { - return nil - } - - routeList := &gateway_api_v1alpha2.GRPCRouteList{} - h.client.List(context.TODO(), routeList) - - for _, grpcRoute := range routeList.Items { - if isServiceUsedByRouteForPolicy(core.NewGRPCRoute(grpcRoute), svc) { - routeName := types.NamespacedName{ - Namespace: grpcRoute.Namespace, - Name: grpcRoute.Name, - } - requests = append(requests, reconcile.Request{NamespacedName: routeName}) - h.log.Infow("Service TargetGroupPolicy change triggering GRPCRoute update", - "serviceName", svc.Namespace+"/"+svc.Name, "routeName", routeName.String()) - } - } - return requests -} - -func (h *targetGroupPolicyEventHandler) MapToServiceExport(obj client.Object) []reconcile.Request { - var requests []reconcile.Request - - svc := h.getTargetRef(obj) - if svc == nil { - return nil - } - svcName := types.NamespacedName{ - Namespace: svc.Namespace, - Name: svc.Name, - } - svcExport := &mcs_api.ServiceExport{} - if err := h.client.Get(context.TODO(), svcName, svcExport); err != nil { - if errors.IsNotFound(err) { - h.log.Debugw("Service does not have its ServiceExport, skipping", - "serviceName", svcName.String()) - } else { - h.log.Errorw("Failed to query matching ServiceExport", - "serviceName", svcName.String(), "reason", err.Error()) - } - return nil - } - requests = append(requests, reconcile.Request{ - NamespacedName: svcName, - }) - return requests -} - -func isServiceUsedByRouteForPolicy(route core.Route, svc *corev1.Service) bool { - for _, rule := range route.Spec().Rules() { - for _, backendRef := range rule.BackendRefs() { - isKindEqual := backendRef.Kind() != nil && string(*backendRef.Kind()) == "Service" - isNameEqual := string(backendRef.Name()) == svc.Name - - namespace := route.Namespace() - if backendRef.Namespace() != nil { - namespace = string(*backendRef.Namespace()) - } - isNamespaceEqual := namespace == svc.Namespace - - if isKindEqual && isNameEqual && isNamespaceEqual { - return true - } - } - } - return false -} diff --git a/controllers/route_controller.go b/controllers/route_controller.go index 1beeec95..d4dae6ab 100644 --- a/controllers/route_controller.go +++ b/controllers/route_controller.go @@ -37,7 +37,6 @@ import ( gateway_api_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/external-dns/endpoint" "github.com/aws/aws-application-networking-k8s/controllers/eventhandlers" @@ -86,16 +85,14 @@ func RegisterAllRouteControllers( ) error { mgrClient := mgr.GetClient() gwEventHandler := eventhandlers.NewEnqueueRequestGatewayEvent(log, mgrClient) - svcEventHandler := eventhandlers.NewEnqueueRequestForServiceWithRoutesEvent(log, mgrClient) - tgpEventHandler := eventhandlers.NewTargetGroupPolicyEventHandler(log, mgrClient) + svcEventHandler := eventhandlers.NewServiceEventHandler(log, mgrClient) routeInfos := []struct { routeType core.RouteType gatewayApiType client.Object - eventMapFunc handler.MapFunc }{ - {core.HttpRouteType, &gateway_api_v1beta1.HTTPRoute{}, tgpEventHandler.MapToHTTPRoute}, - {core.GrpcRouteType, &gateway_api_v1alpha2.GRPCRoute{}, tgpEventHandler.MapToGRPCRoute}, + {core.HttpRouteType, &gateway_api_v1beta1.HTTPRoute{}}, + {core.GrpcRouteType, &gateway_api_v1alpha2.GRPCRoute{}}, } for _, routeInfo := range routeInfos { @@ -112,16 +109,17 @@ func RegisterAllRouteControllers( stackMarshaller: deploy.NewDefaultStackMarshaller(), } - svcImportEventHandler := eventhandlers.NewEqueueRequestServiceImportEvent(log, mgrClient, routeInfo.routeType) + svcImportEventHandler := eventhandlers.NewServiceImportEventHandler(log, mgrClient) builder := ctrl.NewControllerManagedBy(mgr). For(routeInfo.gatewayApiType). Watches(&source.Kind{Type: &gateway_api_v1beta1.Gateway{}}, gwEventHandler). - Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler). - Watches(&source.Kind{Type: &mcs_api.ServiceImport{}}, svcImportEventHandler) + Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler.MapToRoute(routeInfo.routeType)). + Watches(&source.Kind{Type: &mcs_api.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)). + Watches(&source.Kind{Type: &corev1.Endpoints{}}, svcEventHandler.MapToRoute(routeInfo.routeType)) if ok, err := k8s.IsGVKSupported(mgr, v1alpha1.GroupVersion.String(), v1alpha1.TargetGroupPolicyKind); ok { - builder.Watches(&source.Kind{Type: &v1alpha1.TargetGroupPolicy{}}, handler.EnqueueRequestsFromMapFunc(routeInfo.eventMapFunc)) + builder.Watches(&source.Kind{Type: &v1alpha1.TargetGroupPolicy{}}, svcEventHandler.MapToRoute(routeInfo.routeType)) } else { if err != nil { return err diff --git a/controllers/service_controller.go b/controllers/service_controller.go index 56d26823..dd8597bc 100644 --- a/controllers/service_controller.go +++ b/controllers/service_controller.go @@ -18,27 +18,17 @@ package controllers import ( "context" - "fmt" - - "github.com/aws/aws-application-networking-k8s/controllers/eventhandlers" + "github.com/aws/aws-application-networking-k8s/pkg/aws" + "github.com/aws/aws-application-networking-k8s/pkg/deploy" + "github.com/aws/aws-application-networking-k8s/pkg/gateway" "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/latticestore" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/source" - gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" - gateway_api_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" - mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" - - "github.com/aws/aws-application-networking-k8s/pkg/aws" - "github.com/aws/aws-application-networking-k8s/pkg/deploy" - "github.com/aws/aws-application-networking-k8s/pkg/gateway" - "github.com/aws/aws-application-networking-k8s/pkg/latticestore" - "github.com/aws/aws-application-networking-k8s/pkg/model/core" - latticemodel "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" ) const ( @@ -83,15 +73,8 @@ func RegisterServiceController( datastore: datastore, stackMashaller: stackMarshaller, } - epsEventsHandler := eventhandlers.NewEnqueueRequestEndpointEvent(log, client) - routeEventHandler := eventhandlers.NewEnqueueRequestRouteEvent(log, client) - serviceExportHandler := eventhandlers.NewEqueueRequestServiceExportEvent(client) err := ctrl.NewControllerManagedBy(mgr). For(&corev1.Service{}). - Watches(&source.Kind{Type: &corev1.Endpoints{}}, epsEventsHandler). - Watches(&source.Kind{Type: &gateway_api_v1beta1.HTTPRoute{}}, routeEventHandler). - Watches(&source.Kind{Type: &gateway_api_v1alpha2.GRPCRoute{}}, routeEventHandler). - Watches(&source.Kind{Type: &mcs_api.ServiceExport{}}, serviceExportHandler). Complete(sr) return err } @@ -119,50 +102,9 @@ func (r *serviceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err := r.client.Get(ctx, req.NamespacedName, svc); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } - - tgName := latticestore.TargetGroupName(svc.Name, svc.Namespace) - tgs := r.datastore.GetTargetGroupsByName(tgName) if !svc.DeletionTimestamp.IsZero() { - for _, tg := range tgs { - r.log.Debugf("deletion request for tgName: %s: at timestamp: %s", tg.TargetGroupKey.Name, svc.DeletionTimestamp) - r.reconcileTargetsResource(ctx, svc, tg.TargetGroupKey.RouteName) - } r.finalizerManager.RemoveFinalizers(ctx, svc, serviceFinalizer) - } else { - // TODO also need to check serviceexport object to trigger building TargetGroup - for _, tg := range tgs { - r.log.Debugf("update request for tgName: %s", tg.TargetGroupKey.Name) - r.reconcileTargetsResource(ctx, svc, tg.TargetGroupKey.RouteName) - } } r.log.Infow("reconciled", "name", req.Name) return ctrl.Result{}, nil } - -func (r *serviceReconciler) reconcileTargetsResource(ctx context.Context, svc *corev1.Service, routename string) { - if err := r.finalizerManager.AddFinalizers(ctx, svc, serviceFinalizer); err != nil { - r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("failed and finalizer: %s", err)) - } - r.buildAndDeployModel(ctx, svc, routename) -} - -func (r *serviceReconciler) buildAndDeployModel(ctx context.Context, svc *corev1.Service, routename string) (core.Stack, *latticemodel.Targets, error) { - stack, latticeTargets, err := r.modelBuilder.Build(ctx, svc, routename) - if err != nil { - r.eventRecorder.Event(svc, corev1.EventTypeWarning, - k8s.ServiceEventReasonFailedBuildModel, fmt.Sprintf("failed build model: %s", err)) - return nil, nil, err - } - - jsonStack, _ := r.stackMashaller.Marshal(stack) - r.log.Debugw("successfully built model", "stack", jsonStack) - - if err := r.stackDeployer.Deploy(ctx, stack); err != nil { - r.eventRecorder.Event(svc, corev1.EventTypeWarning, - k8s.ServiceEventReasonFailedDeployModel, fmt.Sprintf("failed deploy model: %s", err)) - return nil, nil, err - } - - r.log.Debugw("successfully deployed model", "service", svc.Name) - return stack, latticeTargets, err -} diff --git a/controllers/serviceexport_controller.go b/controllers/serviceexport_controller.go index 030a75b3..0f838e20 100644 --- a/controllers/serviceexport_controller.go +++ b/controllers/serviceexport_controller.go @@ -40,7 +40,6 @@ import ( latticemodel "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - "sigs.k8s.io/controller-runtime/pkg/handler" ) // ServiceExportReconciler reconciles a ServiceExport object @@ -87,15 +86,14 @@ func RegisterServiceExportController( stackMarshaller: stackMarshaller, } - tgpEventHandler := eventhandlers.NewTargetGroupPolicyEventHandler(log, r.client) - svcExportEventsHandler := eventhandlers.NewEqueueRequestServiceWithExportEvent(log, r.client) + svcEventHandler := eventhandlers.NewServiceEventHandler(log, r.client) builder := ctrl.NewControllerManagedBy(mgr). For(&mcs_api.ServiceExport{}). - Watches(&source.Kind{Type: &corev1.Service{}}, svcExportEventsHandler) + Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler.MapToServiceExport()) if ok, err := k8s.IsGVKSupported(mgr, v1alpha1.GroupVersion.String(), v1alpha1.TargetGroupPolicyKind); ok { - builder.Watches(&source.Kind{Type: &v1alpha1.TargetGroupPolicy{}}, handler.EnqueueRequestsFromMapFunc(tgpEventHandler.MapToServiceExport)) + builder.Watches(&source.Kind{Type: &v1alpha1.TargetGroupPolicy{}}, svcEventHandler.MapToServiceExport()) } else { if err != nil { return err