Skip to content

Commit

Permalink
Enable traceflow e2e test on Windows
Browse files Browse the repository at this point in the history
Signed-off-by: gran <gran@vmware.com>
  • Loading branch information
gran-vmv committed Nov 23, 2021
1 parent c040593 commit cf8e369
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 42 deletions.
2 changes: 1 addition & 1 deletion ci/jenkins/test.sh
Expand Up @@ -570,7 +570,7 @@ function run_e2e_windows {

set +e
mkdir -p `pwd`/antrea-test-logs
go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=50m --prometheus
go test -v -run=TestTraceflow antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=50m --prometheus
if [[ "$?" != "0" ]]; then
TEST_FAILURE=true
fi
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -338,6 +339,7 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error {
matchPacket = packet
}
klog.V(2).Infof("Traceflow packet %v", *packet)
klog.Infof("AAAA Traceflow packet %+v", *packet)
}

// Store Traceflow to cache.
Expand Down Expand Up @@ -373,7 +375,17 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error {
time.Sleep(time.Duration(injectLocalPacketDelay) * time.Millisecond)
}
klog.V(2).Infof("Injecting packet for Traceflow %s", tf.Name)
klog.Infof("AAAA Injecting packet for Traceflow %s", tf.Name)
err = c.ofClient.SendTraceflowPacket(tfState.tag, packet, ofPort, -1)

type Traceflow struct {
Status crdv1alpha1.TraceflowStatus `json:"status,omitempty"`
}
reasonMap := map[string]string{"in_port": strconv.Itoa(int(ofPort)), "dl_src": packet.SourceMAC.String(), "dl_dst": c.nodeConfig.GatewayConfig.MAC.String(), "nw_src": packet.SourceIP.String(), "nw_dst": packet.DestinationIP.String(), "nw_tos": strconv.Itoa(int(tfState.tag << 2))}
reason, _ := json.Marshal(reasonMap)
patchData := Traceflow{Status: crdv1alpha1.TraceflowStatus{Reason: string(reason)}}
payloads, _ := json.Marshal(patchData)
c.traceflowClient.CrdV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}
return err
}
Expand Down
18 changes: 15 additions & 3 deletions test/e2e/fixtures.go
Expand Up @@ -415,6 +415,18 @@ func deletePodWrapper(tb testing.TB, data *TestData, namespace, name string) {
// created Pods. Pods are created in parallel to reduce the time required to run the tests.
func createTestBusyboxPods(tb testing.TB, data *TestData, num int, ns string, nodeName string) (
podNames []string, podIPs []*PodIPs, cleanupFn func(),
) {
return createTestPods(tb, data, num, ns, nodeName, data.createBusyboxPodOnNode)
}

func createTestAgnhostPods(tb testing.TB, data *TestData, num int, ns string, nodeName string) (
podNames []string, podIPs []*PodIPs, cleanupFn func(),
) {
return createTestPods(tb, data, num, ns, nodeName, data.createAgnhostPodOnNode)
}

func createTestPods(tb testing.TB, data *TestData, num int, ns string, nodeName string, createFunc func(string, string, string, bool) error) (
podNames []string, podIPs []*PodIPs, cleanupFn func(),
) {
cleanupFn = func() {
var wg sync.WaitGroup
Expand All @@ -436,9 +448,9 @@ func createTestBusyboxPods(tb testing.TB, data *TestData, num int, ns string, no

createPodAndGetIP := func() (string, *PodIPs, error) {
podName := randName("test-pod-")
tb.Logf("Creating a busybox test Pod '%s' and waiting for IP", podName)
if err := data.createBusyboxPodOnNode(podName, ns, nodeName, false); err != nil {
tb.Errorf("Error when creating busybox test Pod '%s': %v", podName, err)
tb.Logf("Creating a test Pod '%s' and waiting for IP", podName)
if err := createFunc(podName, ns, nodeName, false); err != nil {
tb.Errorf("Error when creating test Pod '%s': %v", podName, err)
return "", nil, err
}
podIP, err := data.podWaitForIPs(defaultTimeout, podName, ns)
Expand Down
116 changes: 78 additions & 38 deletions test/e2e/traceflow_test.go
Expand Up @@ -16,9 +16,11 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -53,7 +55,6 @@ type testcase struct {
// TestTraceflow is the top-level test which contains all subtests for
// Traceflow related test cases so they can share setup, teardown.
func TestTraceflow(t *testing.T) {
skipIfHasWindowsNodes(t)
skipIfTraceflowDisabled(t)

data, err := setupTest(t)
Expand Down Expand Up @@ -97,8 +98,12 @@ func testTraceflowIntraNodeANP(t *testing.T, data *TestData) {
k8sUtils, err = NewKubernetesUtils(data)
failOnError(err, t)

node1 := nodeName(0)
node1Pods, _, node1CleanupFn := createTestBusyboxPods(t, data, 3, testNamespace, node1)
nodeIdx := 0
if len(clusterInfo.windowsNodes) != 0 {
nodeIdx = clusterInfo.windowsNodes[0]
}
node1 := nodeName(nodeIdx)
node1Pods, _, node1CleanupFn := createTestAgnhostPods(t, data, 3, testNamespace, node1)
defer node1CleanupFn()

var denyIngress *v1alpha1.NetworkPolicy
Expand All @@ -121,7 +126,7 @@ func testTraceflowIntraNodeANP(t *testing.T, data *TestData) {
t.Errorf("Error when deleting Antrea NetworkPolicy: %v", err)
}
}()
antreaPod, err := data.getAntreaPodOnNode(node1)
antreaPod, err := data.getAntreaPodOnNode(controlPlaneNodeName())
if err = data.waitForNetworkpolicyRealized(antreaPod, denyIngressName, v1beta2.AntreaNetworkPolicy); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -282,10 +287,14 @@ func testTraceflowIntraNodeANP(t *testing.T, data *TestData) {

// testTraceflowIntraNode verifies if traceflow can trace intra node traffic with some NetworkPolicies set.
func testTraceflowIntraNode(t *testing.T, data *TestData) {
node1 := nodeName(0)
nodeIdx := 0
if len(clusterInfo.windowsNodes) != 0 {
nodeIdx = clusterInfo.windowsNodes[0]
}
node1 := nodeName(nodeIdx)

agentPod, _ := data.getAntreaPodOnNode(node1)
node1Pods, node1IPs, node1CleanupFn := createTestBusyboxPods(t, data, 3, testNamespace, node1)
node1Pods, node1IPs, node1CleanupFn := createTestAgnhostPods(t, data, 3, testNamespace, node1)
defer node1CleanupFn()
var pod0IPv4Str, pod1IPv4Str, dstPodIPv4Str, dstPodIPv6Str string
if node1IPs[0].ipv4 != nil {
Expand Down Expand Up @@ -328,7 +337,7 @@ func testTraceflowIntraNode(t *testing.T, data *TestData) {
}
}()

antreaPod, err := data.getAntreaPodOnNode(node1)
antreaPod, err := data.getAntreaPodOnNode(controlPlaneNodeName())
if err = data.waitForNetworkpolicyRealized(antreaPod, allowAllEgressName, v1beta2.K8sNetworkPolicy); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1097,11 +1106,17 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
skipIfProviderIs(t, "kind", "Skipping inter-Node Traceflow test for Kind because of #897")
}

node1 := nodeName(0)
node2 := nodeName(1)
nodeIdx0 := 0
nodeIdx1 := 1
if len(clusterInfo.windowsNodes) != 0 {
nodeIdx0 = clusterInfo.windowsNodes[0]
nodeIdx1 = clusterInfo.windowsNodes[1]
}
node1 := nodeName(nodeIdx0)
node2 := nodeName(nodeIdx1)

node1Pods, _, node1CleanupFn := createTestBusyboxPods(t, data, 1, testNamespace, node1)
node2Pods, node2IPs, node2CleanupFn := createTestBusyboxPods(t, data, 2, testNamespace, node2)
node1Pods, _, node1CleanupFn := createTestAgnhostPods(t, data, 1, testNamespace, node1)
node2Pods, node2IPs, node2CleanupFn := createTestAgnhostPods(t, data, 2, testNamespace, node2)
defer node1CleanupFn()
defer node2CleanupFn()
var dstPodIPv4Str, dstPodIPv6Str string
Expand All @@ -1114,23 +1129,26 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {

// Create Service backend Pod. The "hairpin" testcases require the Service to have a single backend Pod,
// and no more, in order to be deterministic.
nginxPodName := "nginx"
require.NoError(t, data.createNginxPodOnNode(nginxPodName, testNamespace, node2, false))
nginxIP, err := data.podWaitForIPs(defaultTimeout, nginxPodName, testNamespace)
agnhostPodName := "agnhost"
mutateFunc := func(pod *corev1.Pod) {
pod.Labels["app"] = "agnhost-server"
}
require.NoError(t, data.createPodOnNode(agnhostPodName, testNamespace, node2, agnhostImage, []string{"sleep", strconv.Itoa(3600)}, nil, nil, nil, false, mutateFunc))
agnhostIP, err := data.podWaitForIPs(defaultTimeout, agnhostPodName, testNamespace)
require.NoError(t, err)

var nginxIPv4Str, nginxIPv6Str, svcIPv4Name, svcIPv6Name string
if nginxIP.ipv4 != nil {
nginxIPv4Str = nginxIP.ipv4.String()
var agnhostIPv4Str, agnhostIPv6Str, svcIPv4Name, svcIPv6Name string
if agnhostIP.ipv4 != nil {
agnhostIPv4Str = agnhostIP.ipv4.String()
ipv4Protocol := corev1.IPv4Protocol
svcIPv4, err := data.createNginxClusterIPService("nginx-ipv4", testNamespace, false, &ipv4Protocol)
svcIPv4, err := data.createService("agnhost-ipv4", testNamespace, 80, 8080, map[string]string{"app": "agnhost-server"}, false, false, corev1.ServiceTypeClusterIP, &ipv4Protocol)
require.NoError(t, err)
svcIPv4Name = svcIPv4.Name
}
if nginxIP.ipv6 != nil {
nginxIPv6Str = nginxIP.ipv6.String()
if agnhostIP.ipv6 != nil {
agnhostIPv6Str = agnhostIP.ipv6.String()
ipv6Protocol := corev1.IPv6Protocol
svcIPv6, err := data.createNginxClusterIPService("nginx-ipv6", testNamespace, false, &ipv6Protocol)
svcIPv6, err := data.createService("agnhost-ipv6", testNamespace, 80, 8080, map[string]string{"app": "agnhost-server"}, false, false, corev1.ServiceTypeClusterIP, &ipv6Protocol)
require.NoError(t, err)
svcIPv6Name = svcIPv6.Name
}
Expand Down Expand Up @@ -1160,7 +1178,7 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
}
}()

antreaPod, err := data.getAntreaPodOnNode(node2)
antreaPod, err := data.getAntreaPodOnNode(controlPlaneNodeName())
if err = data.waitForNetworkpolicyRealized(antreaPod, allowAllEgressName, v1beta2.K8sNetworkPolicy); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1398,8 +1416,8 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
},
{
Component: v1alpha1.ComponentLB,
Pod: fmt.Sprintf("%s/%s", testNamespace, nginxPodName),
TranslatedDstIP: nginxIPv4Str,
Pod: fmt.Sprintf("%s/%s", testNamespace, agnhostPodName),
TranslatedDstIP: agnhostIPv4Str,
Action: v1alpha1.ActionForwarded,
},
{
Expand Down Expand Up @@ -1435,12 +1453,12 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
ipVersion: 4,
tf: &v1alpha1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: randName(fmt.Sprintf("%s-%s-to-svc-%s-", testNamespace, nginxPodName, svcIPv4Name)),
Name: randName(fmt.Sprintf("%s-%s-to-svc-%s-", testNamespace, agnhostPodName, svcIPv4Name)),
},
Spec: v1alpha1.TraceflowSpec{
Source: v1alpha1.Source{
Namespace: testNamespace,
Pod: nginxPodName,
Pod: agnhostPodName,
},
Destination: v1alpha1.Destination{
Namespace: testNamespace,
Expand Down Expand Up @@ -1470,9 +1488,9 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
},
{
Component: v1alpha1.ComponentLB,
Pod: fmt.Sprintf("%s/%s", testNamespace, nginxPodName),
Pod: fmt.Sprintf("%s/%s", testNamespace, agnhostPodName),
TranslatedSrcIP: "169.254.169.252",
TranslatedDstIP: nginxIPv4Str,
TranslatedDstIP: agnhostIPv4Str,
Action: v1alpha1.ActionForwarded,
},
{
Expand Down Expand Up @@ -1776,8 +1794,8 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
},
{
Component: v1alpha1.ComponentLB,
Pod: fmt.Sprintf("%s/%s", testNamespace, nginxPodName),
TranslatedDstIP: nginxIPv6Str,
Pod: fmt.Sprintf("%s/%s", testNamespace, agnhostPodName),
TranslatedDstIP: agnhostIPv6Str,
Action: v1alpha1.ActionForwarded,
},
{
Expand Down Expand Up @@ -1813,12 +1831,12 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
ipVersion: 6,
tf: &v1alpha1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: randName(fmt.Sprintf("%s-%s-to-svc-%s-", testNamespace, nginxPodName, svcIPv6Name)),
Name: randName(fmt.Sprintf("%s-%s-to-svc-%s-", testNamespace, agnhostPodName, svcIPv6Name)),
},
Spec: v1alpha1.TraceflowSpec{
Source: v1alpha1.Source{
Namespace: testNamespace,
Pod: nginxPodName,
Pod: agnhostPodName,
},
Destination: v1alpha1.Destination{
Namespace: testNamespace,
Expand Down Expand Up @@ -1848,9 +1866,9 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
},
{
Component: v1alpha1.ComponentLB,
Pod: fmt.Sprintf("%s/%s", testNamespace, nginxPodName),
Pod: fmt.Sprintf("%s/%s", testNamespace, agnhostPodName),
TranslatedSrcIP: "fc00::aabb:ccdd:eeff",
TranslatedDstIP: nginxIPv6Str,
TranslatedDstIP: agnhostIPv6Str,
Action: v1alpha1.ActionForwarded,
},
{
Expand Down Expand Up @@ -1943,9 +1961,13 @@ func testTraceflowInterNode(t *testing.T, data *TestData) {
}

func testTraceflowExternalIP(t *testing.T, data *TestData) {
node := nodeName(0)
nodeIP := nodeIP(0)
podNames, _, cleanupFn := createTestBusyboxPods(t, data, 1, testNamespace, node)
nodeIdx := 0
if len(clusterInfo.windowsNodes) != 0 {
nodeIdx = clusterInfo.windowsNodes[0]
}
node := nodeName(nodeIdx)
nodeIP := nodeIP(nodeIdx)
podNames, _, cleanupFn := createTestAgnhostPods(t, data, 1, testNamespace, node)
defer cleanupFn()

testcase := testcase{
Expand Down Expand Up @@ -2006,6 +2028,24 @@ func (data *TestData) waitForTraceflow(t *testing.T, name string, phase v1alpha1
if tf != nil {
t.Errorf("Latest Traceflow status: %v", tf.Status)
}
// TODO: DEBUG
rc, stdout, stderr, err2 := RunCommandOnNode(controlPlaneNodeName(), "kubectl get po -A -owide")
t.Logf("Pods: %d, %s, %s, %+v", rc, stdout, stderr, err2)
podName, _ := data.getAntreaPodOnNode(controlPlaneNodeName())
stdout, stderr, err2 = data.runCommandFromPod(antreaNamespace, podName, "antrea-agent", []string{"ovs-ofctl", "-O", "OpenFlow13", "dump-flows", "br-int"})
t.Logf("Master flow: %s, %s, %+v", stdout, stderr, err2)
if len(clusterInfo.windowsNodes) != 0 {
nodeIdx := clusterInfo.windowsNodes[0]
rc, stdout, stderr, err2 = RunCommandOnNode(nodeName(nodeIdx), "ovs-ofctl -O OpenFlow13 dump-flows br-int")
t.Logf("Windows flow: %d, %s, %s, %+v", rc, stdout, stderr, err2)
reasonMap := map[string]string{}
json.Unmarshal([]byte(tf.Status.Reason), &reasonMap)
cmd := fmt.Sprintf("ovs-appctl ofproto/trace br-int in_port=%s,icmp,nw_ttl=64,dl_src=%s,dl_dst=%s,nw_src=%s,nw_dst=%s,nw_tos=%s", reasonMap["in_port"], reasonMap["dl_src"], reasonMap["dl_dst"], reasonMap["nw_src"], reasonMap["nw_dst"], reasonMap["nw_tos"])
t.Logf("cmd: %s", cmd)
rc, stdout, stderr, err2 = RunCommandOnNode(nodeName(nodeIdx), cmd)
t.Logf("Windows trace: %d, %s, %s, %+v", rc, stdout, stderr, err2)
}
// TODO: DEBUG
return nil, err
}
return tf, nil
Expand Down Expand Up @@ -2169,7 +2209,7 @@ func runTestTraceflow(t *testing.T, data *TestData, tc testcase) {
// Give a little time for Nodes to install OVS flows.
time.Sleep(time.Second * 2)
// Send an ICMP echo packet from the source Pod to the destination.
if err := data.runPingCommandFromTestPod(podInfo{srcPod, "linux", "", ""}, testNamespace, dstPodIPs, busyboxContainerName, 2, 0); err != nil {
if err := data.runPingCommandFromTestPod(podInfo{srcPod, "linux", "", ""}, testNamespace, dstPodIPs, agnhostContainerName, 2, 0); err != nil {
t.Logf("Ping '%s' -> '%v' failed: ERROR (%v)", srcPod, *dstPodIPs, err)
}
}
Expand Down

0 comments on commit cf8e369

Please sign in to comment.