Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(k8s): Switch from CoreV1 Endpoints to DiscoveryV1 EndpointSlice #134

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 58 additions & 65 deletions pkg/internal/meshdetect/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -267,66 +268,65 @@ func isPodInitContainerInjected(meshKind MeshKind, pod *corev1.Pod) bool {
return false
}

// listAllSevices lists all services in all namespaces.
// returns slice if all services.
func (d *Detector) listAllSevices(ctx context.Context, pageSize int) ([]*corev1.Service, error) {
// listAllServices returns all services in all namespaces.
func (d *Detector) listAllServices(ctx context.Context, pageSize int) ([]*corev1.Service, error) {
serviceList := []*corev1.Service{}
continueToken := ""
for {
partialServiceList := &corev1.ServiceList{}
err := d.Client.List(ctx, partialServiceList, client.Limit(pageSize), client.Continue(continueToken))
services := &corev1.ServiceList{}
err := d.Client.List(ctx, services, client.Limit(pageSize), client.Continue(continueToken))
if err != nil {
return nil, err
}

for i := range partialServiceList.Items {
serviceList = append(serviceList, &partialServiceList.Items[i])
for i := range services.Items {
serviceList = append(serviceList, &services.Items[i])
}

continueToken = partialServiceList.GetContinue()
continueToken = services.GetContinue()

if partialServiceList.RemainingItemCount == nil || *partialServiceList.RemainingItemCount == 0 {
if services.RemainingItemCount == nil || *services.RemainingItemCount == 0 {
break
}
}

return serviceList, nil
}

// listAllEndpoints lists all endpoints in all namespaces.
// returns map: namespaced name of endpoints -> endpoints resource
// listAllEndpointsPerService lists all Endpoints per every Service in every namespaces.
// returns map: namespaced name of Service -> slice of Endpoint resources
//
// example: client.ObjectKey{Namespace: "default", Name: "service1"} ->
//
// &corev1.Endpoints{
// ObjectMeta: metav1.ObjectMeta {
// Namespace: "default",
// Name: "service1", ...
// },
// Subsets: ...,
// ...
// []discoveryv1.Endpoint{
// { TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"...}... },
// { TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod2"...}... },
// ...
// }.
func (d *Detector) listAllEndpoints(ctx context.Context, pageSize int) (
map[client.ObjectKey]*corev1.Endpoints, error,
func (d *Detector) listAllEndpointsPerService(ctx context.Context, pageSize int) (
map[client.ObjectKey][]discoveryv1.Endpoint, error,
) {
endpointsMap := map[client.ObjectKey]*corev1.Endpoints{}
continueToken := ""
endpointsMap := map[client.ObjectKey][]discoveryv1.Endpoint{}
var continueToken string
for {
partialEndpointsList := &corev1.EndpointsList{}
err := d.Client.List(ctx, partialEndpointsList, client.Limit(pageSize), client.Continue(continueToken))
if err != nil {
endpoints := &discoveryv1.EndpointSliceList{}
if err := d.Client.List(
ctx, endpoints, client.Limit(pageSize), client.Continue(continueToken),
); err != nil {
return nil, err
}

for i := range partialEndpointsList.Items {
ep := &partialEndpointsList.Items[i]
key := client.ObjectKey{Namespace: ep.Namespace, Name: ep.Name}
endpointsMap[key] = ep
for i := range endpoints.Items {
es := &endpoints.Items[i]
svcName := es.Labels[discoveryv1.LabelServiceName]
if svcName == "" {
continue
}
key := client.ObjectKey{Namespace: es.Namespace, Name: svcName}
endpointsMap[key] = append(endpointsMap[key], es.Endpoints...)
}

continueToken = partialEndpointsList.GetContinue()

if partialEndpointsList.RemainingItemCount == nil || *partialEndpointsList.RemainingItemCount == 0 {
continueToken = endpoints.GetContinue()
if endpoints.RemainingItemCount == nil || *endpoints.RemainingItemCount == 0 {
break
}
}
Expand Down Expand Up @@ -376,12 +376,12 @@ func (d *Detector) DetectServiceDistribution(ctx context.Context) (*ServiceDistr
// list all services, endpoints and pods to check whether
// pods behind each service is injected by each service mesh.

serviceList, err := d.listAllSevices(ctx, defaultPageSize)
serviceList, err := d.listAllServices(ctx, defaultPageSize)
if err != nil {
return nil, errors.Wrap(err, "failed to list services in cluster")
}

endpoints, err := d.listAllEndpoints(ctx, defaultPageSize)
endpointsPerSvc, err := d.listAllEndpointsPerService(ctx, defaultPageSize)
if err != nil {
return nil, errors.Wrap(err, "failed to list endpoints in cluster")
}
Expand All @@ -397,46 +397,39 @@ func (d *Detector) DetectServiceDistribution(ctx context.Context) (*ServiceDistr
}

for _, svc := range serviceList {
key := client.ObjectKeyFromObject(svc)
endpointsResource := endpoints[key]
if endpointsResource == nil {
endpoints := endpointsPerSvc[client.ObjectKeyFromObject(svc)]
if endpoints == nil {
continue
}

// injected is set to true if the service(pod) is injected by mesh.
// Injected is set to true if the service(pod) is injected by mesh.
injected := map[MeshKind]bool{}

// detect if service has annotations to indicate that the service is injected
// (only for traefik)
// Detect if service has annotations to indicate that the service is injected
// (only for traefik).
for meshKind := range meshServiceAnnotations {
injected[meshKind] = isServiceInjected(meshKind, svc)
}

for _, subset := range endpointsResource.Subsets {
for _, address := range subset.Addresses {
// skip if the target endpoint address is not a pod.
if address.TargetRef == nil {
continue
}
if address.TargetRef.Kind != "Pod" {
continue
}

// if one of the pod is injected, we consider this service as running under the mesh.
podKey := client.ObjectKey{
Namespace: address.TargetRef.Namespace,
Name: address.TargetRef.Name,
}
pod := pods[podKey]
if pod == nil {
continue
}
for _, endpoint := range endpoints {
if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
continue
}
// If one of the Pods is injected, we consider this service as running under the mesh.
podKey := client.ObjectKey{
Namespace: endpoint.TargetRef.Namespace,
Name: endpoint.TargetRef.Name,
}
pod := pods[podKey]
if pod == nil {
continue
}

for _, meshKind := range MeshesToDetect {
// set injected to true if one of pods in service is injected with sidecar and init container.
injected[meshKind] = injected[meshKind] ||
(isPodSidecarInjected(meshKind, pod) || isPodInitContainerInjected(meshKind, pod))
}
for _, meshKind := range MeshesToDetect {
// Set injected to true if one of pods in service is injected with sidecar and init container.
injected[meshKind] = injected[meshKind] ||
isPodSidecarInjected(meshKind, pod) ||
isPodInitContainerInjected(meshKind, pod)
}
}

Expand Down
Loading