Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions controllers/eventhandlers/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eventhandlers

import (
"context"
"fmt"
"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
Expand All @@ -27,14 +26,14 @@ func NewEnqueueRequestEndpointEvent(client client.Client) handler.EventHandler {
}

func (h *enqueueRequestsForEndpointsEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
glog.V(6).Info("endpoint create")
glog.V(6).Info("Event: endpoint create")

epNew := e.Object.(*corev1.Endpoints)
h.enqueueImpactedService(queue, epNew)
}

func (h *enqueueRequestsForEndpointsEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
glog.V(6).Info("endpoints Update")
glog.V(6).Info("Event: endpoints update")
epOld := e.ObjectOld.(*corev1.Endpoints)
epNew := e.ObjectNew.(*corev1.Endpoints)
// fmt.Printf("endpoints update epOld [%v] epNew[%v]\n", epOld, epNew)
Expand All @@ -45,15 +44,16 @@ func (h *enqueueRequestsForEndpointsEvent) Update(e event.UpdateEvent, queue wor
}

func (h *enqueueRequestsForEndpointsEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
fmt.Printf("TODO endpoints Delete \n")
glog.V(6).Infof("Event: endpoints delete")
// service event handler handles this event here
}

func (h *enqueueRequestsForEndpointsEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {

}

func (h *enqueueRequestsForEndpointsEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, ep *corev1.Endpoints) {
glog.V(6).Infof("enqueueImpactedService [%v]", ep)
glog.V(6).Infof("Event: enqueueImpactedService [%v]", ep)

var targetIPList []string

Expand All @@ -76,7 +76,7 @@ func (h *enqueueRequestsForEndpointsEvent) enqueueImpactedService(queue workqueu
}

if err := h.client.Get(context.TODO(), namespaceName, svc); err != nil {
glog.V(2).Infof("enqueueImpactedService, service not found %v\n", err)
glog.V(6).Infof("Event: enqueueImpactedService, service not found %v\n", err)
return
}

Expand Down
32 changes: 28 additions & 4 deletions controllers/eventhandlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,48 @@ func NewEqueueRequestServiceEvent(client client.Client) handler.EventHandler {
}

func (h *enqueueRequetsForServiceEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
glog.V(6).Info("Event: service create")
service := e.Object.(*corev1.Service)
h.enqueueImpactedService(queue, service)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being consistent , can we add glog.V(6) here

h.enqueueImpactedServiceExport(queue, service)
}

func (h *enqueueRequetsForServiceEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
}

func (h *enqueueRequetsForServiceEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
glog.V(6).Info("Event: service delete")
service := e.Object.(*corev1.Service)
h.enqueueImpactedService(queue, service)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

h.enqueueImpactedServiceExport(queue, service)
}

func (h *enqueueRequetsForServiceEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {

}

func (h *enqueueRequetsForServiceEvent) enqueueImpactedService(queue workqueue.RateLimitingInterface, ep *corev1.Service) {
glog.V(6).Infof("Event: enqueueImpactedService: %v\n", ep)

srv := &corev1.Service{}
namespacedName := types.NamespacedName{
Namespace: ep.Namespace,
Name: ep.Name,
}

if err := h.client.Get(context.TODO(), namespacedName, srv); err != nil {
glog.V(6).Infof("Event: enqueueImpactedService, service not found %v\n", err)
return
}

queue.Add(reconcile.Request{
NamespacedName: namespacedName,
})

}

func (h *enqueueRequetsForServiceEvent) enqueueImpactedServiceExport(queue workqueue.RateLimitingInterface, ep *corev1.Service) {
glog.V(6).Infof("enqueueImpactedServiceExport: %v\n", ep)
glog.V(6).Infof("Event: enqueueImpactedServiceExport: %v\n", ep)

srvExport := &mcs_api.ServiceExport{}
namespacedName := types.NamespacedName{
Expand All @@ -54,7 +78,7 @@ func (h *enqueueRequetsForServiceEvent) enqueueImpactedServiceExport(queue workq
}

if err := h.client.Get(context.TODO(), namespacedName, srvExport); err != nil {
glog.V(6).Infof("enqueueImpactedServiceExport, serviceexport not found %v\n", err)
glog.V(6).Infof("Event: enqueueImpactedServiceExport, serviceexport not found %v\n", err)
return
}

Expand Down Expand Up @@ -91,7 +115,7 @@ func (h *enqueueHTTPRequetsForServiceEvent) Generic(e event.GenericEvent, queue
}

func (h *enqueueHTTPRequetsForServiceEvent) enqueueImpactedHTTPRoute(queue workqueue.RateLimitingInterface, ep *corev1.Service) {
glog.V(6).Infof("enqueueImpactedHTTPRoute: %v\n", ep)
glog.V(6).Infof("Event: enqueueImpactedHTTPRoute: %v\n", ep)

httpRouteList := &gateway_api.HTTPRouteList{}

Expand All @@ -101,7 +125,7 @@ func (h *enqueueHTTPRequetsForServiceEvent) enqueueImpactedHTTPRoute(queue workq
if !isServiceUsedByHTTPRoute(httpRoute, ep) {
continue
}
glog.V(6).Infof("enqueueImpactedHTTPRoute --> httproute %v \n", httpRoute)
glog.V(6).Infof("Event: enqueueImpactedHTTPRoute --> httproute %v \n", httpRoute)
namespacedName := types.NamespacedName{
Namespace: httpRoute.Namespace,
Name: httpRoute.Name,
Expand Down
16 changes: 11 additions & 5 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
)

const (
// Typo
serviceFinalizer = "service.ki8s.aws/resources"
)

Expand Down Expand Up @@ -101,23 +102,29 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
svcLog.Info("ServiceReconciler")

svc := &corev1.Service{}
ds := r.latticeDataStore

if err := r.Client.Get(ctx, req.NamespacedName, svc); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !svc.DeletionTimestamp.IsZero() {
tgNameD := latticestore.TargetGroupName(svc.Name, svc.Namespace)
TGDeleted := ds.GetTargetGroupsByTG(tgNameD)
for _, tg := range TGDeleted {
glog.V(6).Infof("service deletion trigger target IP list registration %v and tg %v\n",
tgNameD, tg)
r.reconcileTargetsResource(ctx, svc, tg.TargetGroupKey.RouteName)

}
r.finalizerManager.RemoveFinalizers(ctx, svc, serviceFinalizer)
return ctrl.Result{}, nil

return ctrl.Result{}, nil
}

ds := r.latticeDataStore

// TODO also need to check serviceexport object to trigger building TargetGroup
tgName := latticestore.TargetGroupName(svc.Name, svc.Namespace)
TGs := ds.GetTargetGroupsByTG(tgName) // isServiceImport = false

for _, tg := range TGs {

glog.V(6).Infof("endpoints change trigger target IP list registration %v and tg %v\n",
Expand All @@ -131,7 +138,6 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

func (r *ServiceReconciler) reconcileTargetsResource(ctx context.Context, svc *corev1.Service, routename string) {

if err := r.finalizerManager.AddFinalizers(ctx, svc, serviceFinalizer); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed and finalizer due %v", err))
}
Expand Down
34 changes: 20 additions & 14 deletions pkg/gateway/model_build_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,34 +97,40 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context)

endPoints := &corev1.Endpoints{}

svc := &corev1.Service{}
namespacedName := types.NamespacedName{
Namespace: t.tgNamespace,
Name: t.tgName,
}

if err := t.Client.Get(ctx, namespacedName, endPoints); err != nil {
if err := t.Client.Get(ctx, namespacedName, svc); err != nil {
errmsg := fmt.Sprintf("Build Targets failed because K8S service %v does not exist", namespacedName)
glog.V(6).Infof("errmsg: %v\n", errmsg)
return errors.New(errmsg)
}

glog.V(6).Infof("Build Targets: endPoints %v \n", endPoints)
var targetList []latticemodel.Target

for _, endPoint := range endPoints.Subsets {
if svc.DeletionTimestamp.IsZero() {
if err := t.Client.Get(ctx, namespacedName, endPoints); err != nil {
errmsg := fmt.Sprintf("Build Targets failed because K8S service %v does not exist", namespacedName)
glog.V(6).Infof("errmsg: %v\n", errmsg)
return errors.New(errmsg)
}

glog.V(6).Infof("Build Targets: endPoints %v \n", endPoints)

for _, address := range endPoint.Addresses {
for _, port := range endPoint.Ports {
glog.V(6).Infof("serviceReconcile-endpoints: address %v, port %v\n", address, port)
target := latticemodel.Target{
TargetIP: address.IP,
Port: int64(port.Port),
for _, endPoint := range endPoints.Subsets {

for _, address := range endPoint.Addresses {
for _, port := range endPoint.Ports {
glog.V(6).Infof("serviceReconcile-endpoints: address %v, port %v\n", address, port)
target := latticemodel.Target{
TargetIP: address.IP,
Port: int64(port.Port),
}
targetList = append(targetList, target)
}
targetList = append(targetList, target)
}

}

}

glog.V(6).Infof("Build Targets--- targetIPList [%v]\n", targetList)
Expand Down
90 changes: 90 additions & 0 deletions pkg/gateway/model_build_targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand All @@ -26,6 +27,7 @@ func Test_Targets(t *testing.T) {
srvExportName string
srvExportNamespace string
endPoints []corev1.Endpoints
svc corev1.Service
inDataStore bool
refByServiceExport bool
refByService bool
Expand All @@ -50,6 +52,13 @@ func Test_Targets(t *testing.T) {
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
},
},
inDataStore: true,
refByServiceExport: true,
wantErrIsNil: true,
Expand All @@ -72,6 +81,57 @@ func Test_Targets(t *testing.T) {
},
},
},
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test to test service delete ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test is covered at line 85-115

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can u add a new test for just service add without HTTProute stuff...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue for adding e2etest: #257

name: "Delete svc and all endpoints to build spec",
srvExportName: "export1",
srvExportNamespace: "ns1",
endPoints: []corev1.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: "10.10.1.1"}, {IP: "10.10.2.2"}},
Ports: []corev1.EndpointPort{{Name: "a", Port: 8675}, {Name: "b", Port: 309}},
},
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: &metav1.Time{
Time: time.Now(),
},
},
},
inDataStore: true,
refByServiceExport: true,
wantErrIsNil: true,
expectedTargetList: nil,
},
{
name: "Delete svc and no endpoints to build spec",
srvExportName: "export1",
srvExportNamespace: "ns1",
endPoints: []corev1.Endpoints{},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: &metav1.Time{
Time: time.Now(),
},
},
},
inDataStore: true,
refByServiceExport: true,
wantErrIsNil: true,
expectedTargetList: nil,
},
{
name: "Endpoints without TargetGroup",
srvExportName: "export2",
Expand All @@ -90,6 +150,13 @@ func Test_Targets(t *testing.T) {
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
},
},
inDataStore: false,
refByServiceExport: true,
wantErrIsNil: false,
Expand All @@ -112,6 +179,13 @@ func Test_Targets(t *testing.T) {
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
},
},
inDataStore: true,
refByServiceExport: false,
refByService: false,
Expand All @@ -124,6 +198,13 @@ func Test_Targets(t *testing.T) {
inDataStore: false,
refByServiceExport: true,
wantErrIsNil: false,
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
},
},
},
{
name: "Add all endpoints to build spec",
Expand All @@ -143,6 +224,13 @@ func Test_Targets(t *testing.T) {
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export5",
DeletionTimestamp: nil,
},
},
inDataStore: true,
refByService: true,
wantErrIsNil: true,
Expand Down Expand Up @@ -180,6 +268,8 @@ func Test_Targets(t *testing.T) {
assert.NoError(t, k8sClient.Create(ctx, tt.endPoints[0].DeepCopy()))
}

assert.NoError(t, k8sClient.Create(ctx, tt.svc.DeepCopy()))

ds := latticestore.NewLatticeDataStore()

if tt.inDataStore {
Expand Down