Skip to content

Commit

Permalink
service cache should handle duplicate endpoints addresses
Browse files Browse the repository at this point in the history
In some corner cases, a group of endpointslices may have the same
address duplicate in different slices.

Since the Endpoints are cached by address, when aggregating the
endpoints, we should merge the content instead of overwriting.

Signed-off-by: Antonio Ojea <aojea@google.com>
  • Loading branch information
aojea authored and YutaroHayakawa committed Mar 7, 2023
1 parent ed05c7e commit 305af9e
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 1 deletion.
16 changes: 15 additions & 1 deletion pkg/k8s/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,21 @@ func (es *EndpointSlices) GetEndpoints() *Endpoints {
allEps := newEndpoints()
for _, eps := range es.epSlices {
for backend, ep := range eps.Backends {
allEps.Backends[backend] = ep
// EndpointSlices may have duplicate addresses on different slices.
// kubectl get endpointslices -n endpointslicemirroring-4896
// NAME ADDRESSTYPE PORTS ENDPOINTS AGE
// example-custom-endpoints-f6z84 IPv4 9090 10.244.1.49 28s
// example-custom-endpoints-g6r6v IPv4 8090 10.244.1.49 28s
b, ok := allEps.Backends[backend]
if !ok {
allEps.Backends[backend] = ep
} else {
clone := b.DeepCopy()
for k, v := range ep.Ports {
clone.Ports[k] = v
}
allEps.Backends[backend] = clone
}
}
}
return allEps
Expand Down
217 changes: 217 additions & 0 deletions pkg/k8s/service_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,223 @@ func (s *K8sSuite) TestServiceCacheWith2EndpointSlice(c *check.C) {
}, 2*time.Second), check.IsNil)
}

func (s *K8sSuite) TestServiceCacheWith2EndpointSliceSameAddress(c *check.C) {
k8sEndpointSlice1 := &slim_discovery_v1.EndpointSlice{
AddressType: slim_discovery_v1.AddressTypeIPv4,
ObjectMeta: slim_metav1.ObjectMeta{
Name: "foo-yyyyy",
Namespace: "bar",
Labels: map[string]string{
slim_discovery_v1.LabelServiceName: "foo",
},
},
Endpoints: []slim_discovery_v1.Endpoint{
{
Addresses: []string{
"2.2.2.2",
},
},
},
Ports: []slim_discovery_v1.EndpointPort{
{
Name: func() *string { a := "http-test-svc"; return &a }(),
Protocol: func() *slim_corev1.Protocol { a := slim_corev1.ProtocolTCP; return &a }(),
Port: func() *int32 { a := int32(8080); return &a }(),
},
},
}

k8sEndpointSlice2 := &slim_discovery_v1.EndpointSlice{
AddressType: slim_discovery_v1.AddressTypeIPv4,
ObjectMeta: slim_metav1.ObjectMeta{
Name: "foo-xxxxx",
Namespace: "bar",
Labels: map[string]string{
slim_discovery_v1.LabelServiceName: "foo",
},
},
Endpoints: []slim_discovery_v1.Endpoint{
{
Addresses: []string{
"2.2.2.2",
},
},
},
Ports: []slim_discovery_v1.EndpointPort{
{
Name: func() *string { a := "http-test-svc2"; return &a }(),
Protocol: func() *slim_corev1.Protocol { a := slim_corev1.ProtocolTCP; return &a }(),
Port: func() *int32 { a := int32(8081); return &a }(),
},
},
}

svcCache := NewServiceCache(fakeDatapath.NewNodeAddressing())

k8sSvc := &slim_corev1.Service{
ObjectMeta: slim_metav1.ObjectMeta{
Name: "foo",
Namespace: "bar",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: slim_corev1.ServiceSpec{
ClusterIP: "127.0.0.1",
Selector: map[string]string{
"foo": "bar",
},
Type: slim_corev1.ServiceTypeClusterIP,
},
}

swgSvcs := lock.NewStoppableWaitGroup()
svcID := svcCache.UpdateService(k8sSvc, swgSvcs)

time.Sleep(100 * time.Millisecond)

select {
case <-svcCache.Events:
c.Error("Unexpected service event received before endpoints have been imported")
default:
}

swgEps := lock.NewStoppableWaitGroup()
svcCache.UpdateEndpointSlicesV1(k8sEndpointSlice1, swgEps)
svcCache.UpdateEndpointSlicesV1(k8sEndpointSlice2, swgEps)

// The service should be ready as both service and endpoints have been
// imported for k8sEndpointSlice1
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

// The service should be ready as both service and endpoints have been
// imported for k8sEndpointSlice2
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

select {
case <-svcCache.Events:
c.Error("Unexpected service event received when endpoints not selected by a service have been imported")
default:
}
endpoints, ready := svcCache.correlateEndpoints(svcID)
c.Assert(ready, check.Equals, true)
c.Assert(endpoints.String(), check.Equals, "2.2.2.2:8080/TCP,2.2.2.2:8081/TCP")

// Updating the service without changing it should not result in an event
svcCache.UpdateService(k8sSvc, swgSvcs)
time.Sleep(100 * time.Millisecond)
select {
case <-svcCache.Events:
c.Error("Unexpected service event received for unchanged service object")
default:
}

// Deleting the service will result in a service delete event
svcCache.DeleteService(k8sSvc, swgSvcs)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, DeleteService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

// Reinserting the service should re-match with the still existing endpoints
svcCache.UpdateService(k8sSvc, swgSvcs)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

// Deleting the k8sEndpointSlice2 will result in a service update event
svcCache.DeleteEndpointSlices(k8sEndpointSlice2, swgEps)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

endpoints, ready = svcCache.correlateEndpoints(svcID)
c.Assert(ready, check.Equals, true)
c.Assert(endpoints.String(), check.Equals, "2.2.2.2:8080/TCP")

svcCache.DeleteEndpointSlices(k8sEndpointSlice1, swgEps)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

endpoints, serviceReady := svcCache.correlateEndpoints(svcID)
c.Assert(serviceReady, check.Equals, false)
c.Assert(endpoints.String(), check.Equals, "")

// Reinserting the endpoints should re-match with the still existing service
svcCache.UpdateEndpointSlicesV1(k8sEndpointSlice1, swgEps)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, UpdateService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

endpoints, serviceReady = svcCache.correlateEndpoints(svcID)
c.Assert(serviceReady, check.Equals, true)
c.Assert(endpoints.String(), check.Equals, "2.2.2.2:8080/TCP")

// Deleting the service will result in a service delete event
svcCache.DeleteService(k8sSvc, swgSvcs)
c.Assert(testutils.WaitUntil(func() bool {
event := <-svcCache.Events
defer event.SWG.Done()
c.Assert(event.Action, check.Equals, DeleteService)
c.Assert(event.ID, check.Equals, svcID)
return true
}, 2*time.Second), check.IsNil)

// Deleting the endpoints will not emit an event as the notification
// was sent out when the service was deleted.
svcCache.DeleteEndpointSlices(k8sEndpointSlice1, swgEps)
time.Sleep(100 * time.Millisecond)
select {
case <-svcCache.Events:
c.Error("Unexpected service delete event received")
default:
}

swgSvcs.Stop()
c.Assert(testutils.WaitUntil(func() bool {
swgSvcs.Wait()
return true
}, 2*time.Second), check.IsNil)

swgEps.Stop()
c.Assert(testutils.WaitUntil(func() bool {
swgEps.Wait()
return true
}, 2*time.Second), check.IsNil)
}

func (s *K8sSuite) TestServiceEndpointFiltering(c *check.C) {
k8sSvc := &slim_corev1.Service{
ObjectMeta: slim_metav1.ObjectMeta{
Expand Down

0 comments on commit 305af9e

Please sign in to comment.