Skip to content

Commit

Permalink
Improving port forwarding error handling (#1839)
Browse files Browse the repository at this point in the history
* adding error handling hook to portforwarder. documenting exported symbols. removing unnecessary build constraints

* improving the port forward wrapper api
  • Loading branch information
ramiro-gamarra committed Sep 8, 2023
1 parent feb03c0 commit 4c2eb0d
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 109 deletions.
142 changes: 74 additions & 68 deletions test/integration/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package k8s

import (
"context"
"log"

//"dnc/test/integration/goldpinger"
"errors"
"flag"
Expand Down Expand Up @@ -43,7 +41,6 @@ var (
kubeconfig = flag.String("test-kubeconfig", filepath.Join(homedir.HomeDir(), ".kube", "config"), "(optional) absolute path to the kubeconfig file")
delegatedSubnetID = flag.String("delegated-subnet-id", "", "delegated subnet id for node labeling")
delegatedSubnetName = flag.String("subnet-name", "", "subnet name for node labeling")
gpPodScaleCounts = []int{2, 10, 100, 2}
)

func shouldLabelNodes() bool {
Expand Down Expand Up @@ -141,8 +138,12 @@ func TestPodScaling(t *testing.T) {
}
})

podsClient := clientset.CoreV1().Pods(deployment.Namespace)

gpPodScaleCounts := []int{2, 10, 100, 2}
for _, c := range gpPodScaleCounts {
count := c

t.Run(fmt.Sprintf("replica count %d", count), func(t *testing.T) {
replicaCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
defer cancel()
Expand All @@ -151,93 +152,98 @@ func TestPodScaling(t *testing.T) {
t.Fatalf("could not scale deployment: %v", err)
}

if !t.Run("all pods have IPs assigned", func(t *testing.T) {
podsClient := clientset.CoreV1().Pods(deployment.Namespace)
t.Log("checking that all pods have IPs assigned")

checkPodIPsFn := func() error {
podList, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app=goldpinger"})
if err != nil {
return err
}
checkPodIPsFn := func() error {
podList, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "app=goldpinger"})
if err != nil {
return err
}

if len(podList.Items) == 0 {
return errors.New("no pods scheduled")
}
if len(podList.Items) == 0 {
return errors.New("no pods scheduled")
}

for _, pod := range podList.Items {
if pod.Status.Phase == apiv1.PodPending {
return errors.New("some pods still pending")
}
for _, pod := range podList.Items {
if pod.Status.Phase == apiv1.PodPending {
return errors.New("some pods still pending")
}
}

for _, pod := range podList.Items {
if pod.Status.PodIP == "" {
return errors.New("a pod has not been allocated an IP")
}
for _, pod := range podList.Items {
if pod.Status.PodIP == "" {
return errors.New("a pod has not been allocated an IP")
}

return nil
}
err := defaultRetrier.Do(ctx, checkPodIPsFn)

return nil
}

if err := defaultRetrier.Do(ctx, checkPodIPsFn); err != nil {
t.Fatalf("not all pods were allocated IPs: %v", err)
}

t.Log("all pods have been allocated IPs")
t.Log("checking that all pods can ping each other")

clusterCheckCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
defer cancel()

pfOpts := PortForwardingOpts{
Namespace: "default",
LabelSelector: "type=goldpinger-pod",
LocalPort: 9090,
DestPort: 8080,
}

pingCheckFn := func() error {
pf, err := NewPortForwarder(restConfig, t, pfOpts)
if err != nil {
t.Fatalf("not all pods were allocated IPs: %v", err)
t.Fatalf("could not build port forwarder: %v", err)
}
t.Log("all pods have been allocated IPs")
}) {
errors.New("Pods don't have IP's")
return
}

t.Run("all pods can ping each other", func(t *testing.T) {
clusterCheckCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
portForwardCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
defer cancel()
clusterCheckFn := func() error {
pf, err := NewPortForwarder(restConfig)
if err != nil {
t.Fatal(err)
}

portForwardCtx, cancel := context.WithTimeout(ctx, (retryAttempts+1)*retryDelaySec)
defer cancel()

var streamHandle PortForwardStreamHandle
portForwardFn := func() error {
log.Printf("attempting port forward")
handle, err := pf.Forward(ctx, "default", "type=goldpinger-pod", 9090, 8080)
if err != nil {
return err
}
portForwardFn := func() error {
t.Log("attempting port forward")

streamHandle = handle
return nil
if err := pf.Forward(portForwardCtx); err != nil {
return fmt.Errorf("could not start port forward: %w", err)
}
if err := defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil {
t.Fatalf("could not start port forward within %v: %v", retryDelaySec.String(), err)
}
defer streamHandle.Stop()

gpClient := goldpinger.Client{Host: streamHandle.Url()}
return nil
}

clusterState, err := gpClient.CheckAll(clusterCheckCtx)
if err != nil {
return err
}
if err := defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil {
t.Fatalf("could not start port forward within %v: %v", retryDelaySec.String(), err)
}

stats := goldpinger.ClusterStats(clusterState)
stats.PrintStats()
if stats.AllPingsHealthy() {
return nil
}
go pf.KeepAlive(clusterCheckCtx)

defer pf.Stop()

gpClient := goldpinger.Client{Host: pf.Address()}

return errors.New("not all pings are healthy")
clusterState, err := gpClient.CheckAll(clusterCheckCtx)
if err != nil {
return fmt.Errorf("could not check all goldpinger pods: %w", err)
}

if err := defaultRetrier.Do(clusterCheckCtx, clusterCheckFn); err != nil {
t.Fatalf("cluster could not reach healthy state: %v", err)
stats := goldpinger.ClusterStats(clusterState)
stats.PrintStats()
if stats.AllPingsHealthy() {
return nil
}

t.Log("all pings successful!")
})
return errors.New("not all pings are healthy")
}

if err := defaultRetrier.Do(clusterCheckCtx, pingCheckFn); err != nil {
t.Fatalf("cluster could not reach healthy state: %v", err)
}

t.Log("all pings successful!")
})
}
}
Expand Down
2 changes: 0 additions & 2 deletions test/integration/label.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build integration

package k8s

import (
Expand Down

0 comments on commit 4c2eb0d

Please sign in to comment.