Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tests): workaround to not rely on helpers from KTF #3980

Merged
merged 1 commit into from
May 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 145 additions & 2 deletions test/integration/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ package integration
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/kong/kubernetes-testing-framework/pkg/clusters"
"github.com/kong/kubernetes-testing-framework/pkg/clusters/types/kind"
"github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/networking"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
admregv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"

"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
testutils "github.com/kong/kubernetes-ingress-controller/v2/internal/util/test"
Expand Down Expand Up @@ -679,7 +681,7 @@ func waitForWebhookServiceConnective(ctx context.Context, configResourceName str
waitCtx, cancel := context.WithTimeout(ctx, ingressWait)
defer cancel()

return networking.WaitForConnectionOnServicePort(waitCtx, env.Cluster().Client(), consts.ControllerNamespace, svcName, svcPort, 10*time.Second)
return waitForConnectionOnServicePort(waitCtx, env.Cluster().Client(), consts.ControllerNamespace, svcName, svcPort, 10*time.Second)
}

func ensureAdmissionRegistration(ctx context.Context, configResourceName string, rules []admregv1.RuleWithOperations) (func() error, error) {
Expand Down Expand Up @@ -722,3 +724,144 @@ func ensureAdmissionRegistration(ctx context.Context, configResourceName string,

return closer, nil
}

// TODO: Below helper functions were copied from https://github.com/Kong/kubernetes-testing-framework/pull/643
// and should be removed once a new version of KTF (with that PR) will be released and it will be possible
// to upgrade it in KIC. Read more about required steps in https://github.com/Kong/kubernetes-ingress-controller/issues/3981.

// waitForServiceLoadBalancerAddress waits for a service provided by
// namespace/name to have an ingress IP or Host provisioned and returns that
// address. This function will throw an error if the service gets provisioned
// more than a single address, that is not supported. The context provided
// should have a timeout associated with it or you're going to have a bad time.
func waitForServiceLoadBalancerAddress(ctx context.Context, c kubernetes.Interface, namespace, name string) (string, bool, error) {
for {
select {
case <-ctx.Done():
return "", false, fmt.Errorf("context completed while waiting for loadbalancer service to provision: %w", ctx.Err())
default:
// retrieve a fresh copy of the service
service, err := c.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", false, fmt.Errorf("error while trying to retrieve registry service: %w", err)
}
lbing := service.Status.LoadBalancer.Ingress

// don't support services which have multiple addresses
if len(lbing) > 1 {
return "", false, fmt.Errorf("services with more than one load balancer address are not supported (found %d)", len(lbing))
}

// verify whether the loadbalancer details are provisioned
if len(lbing) == 1 {
for _, ing := range lbing {
if ing.Hostname != "" {
return ing.Hostname, false, nil
}
if ing.IP != "" {
return ing.IP, true, nil
}
}
}
}
}
}

// waitForConnectionOnServicePort waits until it can make successful TCP connections
// to a service (provided by namespace/name). This will temporarily create a LoadBalancer
// type Service to allow connections to the Service and port from outside the cluster while
// the connection attempts are made using the LoadBalancer public address.
func waitForConnectionOnServicePort(ctx context.Context, c kubernetes.Interface, namespace, name string, port int, dialTimeout time.Duration) error {
svcClient := c.CoreV1().Services(namespace)
service, err := svcClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
const correspondingSvcNameLabel = "corresponding-service"
lbServiceName := "templb-" + name
tempLoadBalancer := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: lbServiceName,
Labels: map[string]string{
correspondingSvcNameLabel: name,
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
// Copy the selector and ports of the service to check.
Selector: service.Spec.Selector,
Ports: service.Spec.Ports,
},
}

// Empty selector, we should create the endpoints separately.
// If the target service does not have a selector, it usually means that
// the endpoints of the target server is manually created, but not chosen from pods by labels in selector.
// so we need to manually create the same endpoints as the target service has here.
if len(service.Spec.Selector) == 0 {
epsClient := c.DiscoveryV1().EndpointSlices(namespace)
endpointSlices, err := epsClient.List(
ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + name},
)
if err != nil {
return err
}

// Recreate EndpointSlices for the lb service with proper metadata.
tempEndpointSlices := endpointSlices.DeepCopy().Items
for i := range tempEndpointSlices {
epsName := fmt.Sprintf("%s-%d", lbServiceName, i)
tempEndpointSlices[i].ObjectMeta = metav1.ObjectMeta{
Namespace: namespace,
Name: epsName,
Labels: map[string]string{
discoveryv1.LabelServiceName: lbServiceName, // Maps EndpointSlice to Service.
correspondingSvcNameLabel: name,
},
}
if _, err = epsClient.Create(ctx, &tempEndpointSlices[i], metav1.CreateOptions{}); err != nil {
return err
}
// For each successfully created temporary EndpointSlice ensure deletion on return from the function.
defer func(epsName string) {
err := epsClient.Delete(ctx, epsName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
fmt.Printf("failed to delete EndpointSlice %s/%s after testing, error %v\n",
namespace, epsName, err,
)
}
}(epsName)
}
}

if _, err = svcClient.Create(ctx, tempLoadBalancer, metav1.CreateOptions{}); err != nil {
return err
}
defer func() {
err := svcClient.Delete(ctx, lbServiceName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
fmt.Printf("failed to delete service %s/%s after testing, error %v\n",
namespace, lbServiceName, err)
}
}()

ip, _, err := waitForServiceLoadBalancerAddress(ctx, c, namespace, lbServiceName)
if err != nil {
return err
}

ticker := time.NewTicker(time.Second)
address := fmt.Sprintf("%s:%d", ip, port)
for {
select {
case <-ctx.Done():
return fmt.Errorf("context completed or dialTimeout reached while waiting for %s:%d to be connected", ip, port)
case <-ticker.C:
dialer := &net.Dialer{Timeout: dialTimeout}
if _, err := dialer.DialContext(ctx, "tcp", address); err == nil {
return nil
}
}
}
}
Loading