Skip to content

Commit

Permalink
feat(k8s): Switch from CoreV1 Endpoints to DiscoveryV1 EndpointSlice
Browse files Browse the repository at this point in the history
  • Loading branch information
programmer04 committed May 8, 2023
1 parent b4c8126 commit 6ca3e7d
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 149 deletions.
123 changes: 58 additions & 65 deletions pkg/internal/meshdetect/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -267,66 +268,65 @@ func isPodInitContainerInjected(meshKind MeshKind, pod *corev1.Pod) bool {
return false
}

// listAllSevices lists all services in all namespaces.
// returns slice if all services.
func (d *Detector) listAllSevices(ctx context.Context, pageSize int) ([]*corev1.Service, error) {
// listAllServices returns all services in all namespaces.
func (d *Detector) listAllServices(ctx context.Context, pageSize int) ([]*corev1.Service, error) {
serviceList := []*corev1.Service{}
continueToken := ""
for {
partialServiceList := &corev1.ServiceList{}
err := d.Client.List(ctx, partialServiceList, client.Limit(pageSize), client.Continue(continueToken))
services := &corev1.ServiceList{}
err := d.Client.List(ctx, services, client.Limit(pageSize), client.Continue(continueToken))
if err != nil {
return nil, err
}

for i := range partialServiceList.Items {
serviceList = append(serviceList, &partialServiceList.Items[i])
for i := range services.Items {
serviceList = append(serviceList, &services.Items[i])
}

continueToken = partialServiceList.GetContinue()
continueToken = services.GetContinue()

if partialServiceList.RemainingItemCount == nil || *partialServiceList.RemainingItemCount == 0 {
if services.RemainingItemCount == nil || *services.RemainingItemCount == 0 {
break
}
}

return serviceList, nil
}

// listAllEndpoints lists all endpoints in all namespaces.
// returns map: namespaced name of endpoints -> endpoints resource
// listAllEndpointsPerService lists all Endpoints per every Service in every namespaces.
// returns map: namespaced name of Service -> slice of Endpoint resources
//
// example: client.ObjectKey{Namespace: "default", Name: "service1"} ->
//
// &corev1.Endpoints{
// ObjectMeta: metav1.ObjectMeta {
// Namespace: "default",
// Name: "service1", ...
// },
// Subsets: ...,
// ...
// []discoveryv1.Endpoint{
// { TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"...}... },
// { TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod2"...}... },
// ...
// }.
func (d *Detector) listAllEndpoints(ctx context.Context, pageSize int) (
map[client.ObjectKey]*corev1.Endpoints, error,
func (d *Detector) listAllEndpointsPerService(ctx context.Context, pageSize int) (
map[client.ObjectKey][]discoveryv1.Endpoint, error,
) {
endpointsMap := map[client.ObjectKey]*corev1.Endpoints{}
continueToken := ""
endpointsMap := map[client.ObjectKey][]discoveryv1.Endpoint{}
var continueToken string
for {
partialEndpointsList := &corev1.EndpointsList{}
err := d.Client.List(ctx, partialEndpointsList, client.Limit(pageSize), client.Continue(continueToken))
if err != nil {
endpoints := &discoveryv1.EndpointSliceList{}
if err := d.Client.List(
ctx, endpoints, client.Limit(pageSize), client.Continue(continueToken),
); err != nil {
return nil, err
}

for i := range partialEndpointsList.Items {
ep := &partialEndpointsList.Items[i]
key := client.ObjectKey{Namespace: ep.Namespace, Name: ep.Name}
endpointsMap[key] = ep
for i := range endpoints.Items {
es := &endpoints.Items[i]
svcName := es.Labels[discoveryv1.LabelServiceName]
if svcName == "" {
continue
}
key := client.ObjectKey{Namespace: es.Namespace, Name: svcName}
endpointsMap[key] = append(endpointsMap[key], es.Endpoints...)
}

continueToken = partialEndpointsList.GetContinue()

if partialEndpointsList.RemainingItemCount == nil || *partialEndpointsList.RemainingItemCount == 0 {
continueToken = endpoints.GetContinue()
if endpoints.RemainingItemCount == nil || *endpoints.RemainingItemCount == 0 {
break
}
}
Expand Down Expand Up @@ -376,12 +376,12 @@ func (d *Detector) DetectServiceDistribution(ctx context.Context) (*ServiceDistr
// list all services, endpoints and pods to check whether
// pods behind each service is injected by each service mesh.

serviceList, err := d.listAllSevices(ctx, defaultPageSize)
serviceList, err := d.listAllServices(ctx, defaultPageSize)
if err != nil {
return nil, errors.Wrap(err, "failed to list services in cluster")
}

endpoints, err := d.listAllEndpoints(ctx, defaultPageSize)
endpointsPerSvc, err := d.listAllEndpointsPerService(ctx, defaultPageSize)
if err != nil {
return nil, errors.Wrap(err, "failed to list endpoints in cluster")
}
Expand All @@ -397,46 +397,39 @@ func (d *Detector) DetectServiceDistribution(ctx context.Context) (*ServiceDistr
}

for _, svc := range serviceList {
key := client.ObjectKeyFromObject(svc)
endpointsResource := endpoints[key]
if endpointsResource == nil {
endpoints := endpointsPerSvc[client.ObjectKeyFromObject(svc)]
if endpoints == nil {
continue
}

// injected is set to true if the service(pod) is injected by mesh.
// Injected is set to true if the service(pod) is injected by mesh.
injected := map[MeshKind]bool{}

// detect if service has annotations to indicate that the service is injected
// (only for traefik)
// Detect if service has annotations to indicate that the service is injected
// (only for traefik).
for meshKind := range meshServiceAnnotations {
injected[meshKind] = isServiceInjected(meshKind, svc)
}

for _, subset := range endpointsResource.Subsets {
for _, address := range subset.Addresses {
// skip if the target endpoint address is not a pod.
if address.TargetRef == nil {
continue
}
if address.TargetRef.Kind != "Pod" {
continue
}

// if one of the pod is injected, we consider this service as running under the mesh.
podKey := client.ObjectKey{
Namespace: address.TargetRef.Namespace,
Name: address.TargetRef.Name,
}
pod := pods[podKey]
if pod == nil {
continue
}
for _, endpoint := range endpoints {
if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
continue
}
// If one of the Pods is injected, we consider this service as running under the mesh.
podKey := client.ObjectKey{
Namespace: endpoint.TargetRef.Namespace,
Name: endpoint.TargetRef.Name,
}
pod := pods[podKey]
if pod == nil {
continue
}

for _, meshKind := range MeshesToDetect {
// set injected to true if one of pods in service is injected with sidecar and init container.
injected[meshKind] = injected[meshKind] ||
(isPodSidecarInjected(meshKind, pod) || isPodInitContainerInjected(meshKind, pod))
}
for _, meshKind := range MeshesToDetect {
// Set injected to true if one of pods in service is injected with sidecar and init container.
injected[meshKind] = injected[meshKind] ||
isPodSidecarInjected(meshKind, pod) ||
isPodInitContainerInjected(meshKind, pod)
}
}

Expand Down
Loading

0 comments on commit 6ca3e7d

Please sign in to comment.