From 2d8ae3d217972022018200efa14aaf8f07eff9ee Mon Sep 17 00:00:00 2001 From: Jakub Warczarek Date: Tue, 9 May 2023 14:24:16 +0200 Subject: [PATCH] chore(tests): workaround to not rely on helpers from KTF --- test/integration/webhook_test.go | 147 ++++++++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 2 deletions(-) diff --git a/test/integration/webhook_test.go b/test/integration/webhook_test.go index 46fb70d28e..f84849a48f 100644 --- a/test/integration/webhook_test.go +++ b/test/integration/webhook_test.go @@ -6,6 +6,7 @@ package integration import ( "context" "fmt" + "net" "strings" "testing" "time" @@ -13,14 +14,15 @@ import ( "github.com/google/uuid" "github.com/kong/kubernetes-testing-framework/pkg/clusters" "github.com/kong/kubernetes-testing-framework/pkg/clusters/types/kind" - "github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/networking" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" admregv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" "github.com/kong/kubernetes-ingress-controller/v2/internal/annotations" testutils "github.com/kong/kubernetes-ingress-controller/v2/internal/util/test" @@ -679,7 +681,7 @@ func waitForWebhookServiceConnective(ctx context.Context, configResourceName str waitCtx, cancel := context.WithTimeout(ctx, ingressWait) defer cancel() - return networking.WaitForConnectionOnServicePort(waitCtx, env.Cluster().Client(), consts.ControllerNamespace, svcName, svcPort, 10*time.Second) + return waitForConnectionOnServicePort(waitCtx, env.Cluster().Client(), consts.ControllerNamespace, svcName, svcPort, 10*time.Second) } func ensureAdmissionRegistration(ctx context.Context, configResourceName string, rules []admregv1.RuleWithOperations) (func() error, error) { @@ -722,3 +724,144 @@ func ensureAdmissionRegistration(ctx context.Context, configResourceName string, return closer, nil } + +// TODO: Below helper functions were copied from https://github.com/Kong/kubernetes-testing-framework/pull/643 +// and should be removed once a new version of KTF (with that PR) will be released and it will be possible +// to upgrade it in KIC. Read more about required steps in https://github.com/Kong/kubernetes-ingress-controller/issues/3981. + +// waitForServiceLoadBalancerAddress waits for a service provided by +// namespace/name to have an ingress IP or Host provisioned and returns that +// address. This function will throw an error if the service gets provisioned +// more than a single address, that is not supported. The context provided +// should have a timeout associated with it or you're going to have a bad time. +func waitForServiceLoadBalancerAddress(ctx context.Context, c kubernetes.Interface, namespace, name string) (string, bool, error) { + for { + select { + case <-ctx.Done(): + return "", false, fmt.Errorf("context completed while waiting for loadbalancer service to provision: %w", ctx.Err()) + default: + // retrieve a fresh copy of the service + service, err := c.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return "", false, fmt.Errorf("error while trying to retrieve registry service: %w", err) + } + lbing := service.Status.LoadBalancer.Ingress + + // don't support services which have multiple addresses + if len(lbing) > 1 { + return "", false, fmt.Errorf("services with more than one load balancer address are not supported (found %d)", len(lbing)) + } + + // verify whether the loadbalancer details are provisioned + if len(lbing) == 1 { + for _, ing := range lbing { + if ing.Hostname != "" { + return ing.Hostname, false, nil + } + if ing.IP != "" { + return ing.IP, true, nil + } + } + } + } + } +} + +// waitForConnectionOnServicePort waits until it can make successful TCP connections +// to a service (provided by namespace/name). This will temporarily create a LoadBalancer +// type Service to allow connections to the Service and port from outside the cluster while +// the connection attempts are made using the LoadBalancer public address. +func waitForConnectionOnServicePort(ctx context.Context, c kubernetes.Interface, namespace, name string, port int, dialTimeout time.Duration) error { + svcClient := c.CoreV1().Services(namespace) + service, err := svcClient.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + const correspondingSvcNameLabel = "corresponding-service" + lbServiceName := "templb-" + name + tempLoadBalancer := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: lbServiceName, + Labels: map[string]string{ + correspondingSvcNameLabel: name, + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + // Copy the selector and ports of the service to check. + Selector: service.Spec.Selector, + Ports: service.Spec.Ports, + }, + } + + // Empty selector, we should create the endpoints separately. + // If the target service does not have a selector, it usually means that + // the endpoints of the target server is manually created, but not chosen from pods by labels in selector. + // so we need to manually create the same endpoints as the target service has here. + if len(service.Spec.Selector) == 0 { + epsClient := c.DiscoveryV1().EndpointSlices(namespace) + endpointSlices, err := epsClient.List( + ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + name}, + ) + if err != nil { + return err + } + + // Recreate EndpointSlices for the lb service with proper metadata. + tempEndpointSlices := endpointSlices.DeepCopy().Items + for i := range tempEndpointSlices { + epsName := fmt.Sprintf("%s-%d", lbServiceName, i) + tempEndpointSlices[i].ObjectMeta = metav1.ObjectMeta{ + Namespace: namespace, + Name: epsName, + Labels: map[string]string{ + discoveryv1.LabelServiceName: lbServiceName, // Maps EndpointSlice to Service. + correspondingSvcNameLabel: name, + }, + } + if _, err = epsClient.Create(ctx, &tempEndpointSlices[i], metav1.CreateOptions{}); err != nil { + return err + } + // For each successfully created temporary EndpointSlice ensure deletion on return from the function. + defer func(epsName string) { + err := epsClient.Delete(ctx, epsName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + fmt.Printf("failed to delete EndpointSlice %s/%s after testing, error %v\n", + namespace, epsName, err, + ) + } + }(epsName) + } + } + + if _, err = svcClient.Create(ctx, tempLoadBalancer, metav1.CreateOptions{}); err != nil { + return err + } + defer func() { + err := svcClient.Delete(ctx, lbServiceName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + fmt.Printf("failed to delete service %s/%s after testing, error %v\n", + namespace, lbServiceName, err) + } + }() + + ip, _, err := waitForServiceLoadBalancerAddress(ctx, c, namespace, lbServiceName) + if err != nil { + return err + } + + ticker := time.NewTicker(time.Second) + address := fmt.Sprintf("%s:%d", ip, port) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context completed or dialTimeout reached while waiting for %s:%d to be connected", ip, port) + case <-ticker.C: + dialer := &net.Dialer{Timeout: dialTimeout} + if _, err := dialer.DialContext(ctx, "tcp", address); err == nil { + return nil + } + } + } +}