Skip to content

Commit

Permalink
Add e2e multicast testing.
Browse files Browse the repository at this point in the history
Create a multicast source in one node and 2 listener in another
node.
One of the listener should join the group and receive the multicast
traffic, and the other listener should try to join another group
that does not exist and fail to receive any multicast traffic.

Organize the e2e folder correctly to avoid issues with go modules and
imports.

Disable IPv6 multicast tests for now because iperf in agnhost images
doesn't support binding a server to an IPv6 multicast group:

  bash-4.3# iperf -s -B ff00:0:3:3::3 -u -t 30 -i 5 -V
  error: Try again

Co-authored-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Antonio Ojea <aojea@redhat.com>
Signed-off-by: Dumitru Ceara <dceara@redhat.com>
  • Loading branch information
Antonio Ojea and dceara committed Dec 21, 2020
1 parent cbc7294 commit e6f2d17
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/test.yml
Expand Up @@ -111,8 +111,10 @@ jobs:
target:
- shard: shard-conformance
hybrid-overlay: false
multicast-enable: false
- shard: control-plane
hybrid-overlay: true
multicast-enable: true
ha:
- enabled: "true"
name: "HA"
Expand Down Expand Up @@ -157,6 +159,10 @@ jobs:
- {"ipfamily": {"ip": ipv6}, "ha": {"enabled": "false"}, "gateway-mode": shared}
- {"ipfamily": {"ip": dualstack}, "ha": {"enabled": "true"}, "gateway-mode": shared}
- {"ipfamily": {"ip": dualstack}, "ha": {"enabled": "false"}}
# IPv6 multicast is supported but tests fail due to old iperf version
# in agnhost images. Disable them for now.
- {"ipfamily": {"ip": dualstack}, "target": {"shard": {"multicast-enable": "true"}}}
- {"ipfamily": {"ip": ipv6}, "target": {"shard": {"multicast-enable": "true"}}}
needs: [build]
env:
JOB_NAME: "${{ matrix.target.shard }}-${{ matrix.ha.name }}-${{ matrix.gateway-mode }}-${{ matrix.ipfamily.name }}"
Expand All @@ -165,6 +171,7 @@ jobs:
KIND_IPV6_SUPPORT: "${{ matrix.ipfamily.ipv6 }}"
OVN_HYBRID_OVERLAY_ENABLE: "${{ matrix.target.hybrid-overlay }}"
OVN_GATEWAY_MODE: "${{ matrix.gateway-mode }}"
OVN_MULTICAST_ENABLE: "${{ matrix.target.multicast-enable }}"
steps:

- name: Free up disk space
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/e2e_suite_test.go
@@ -1,4 +1,4 @@
package e2e_test
package e2e

import (
"flag"
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/e2e_test.go
@@ -1,4 +1,4 @@
package e2e_test
package e2e

import (
"encoding/json"
Expand Down
1 change: 1 addition & 0 deletions test/e2e/go.mod
Expand Up @@ -31,6 +31,7 @@ require (
github.com/onsi/gomega v1.9.0
k8s.io/api v0.17.4
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.17.4
k8s.io/klog v1.0.0
k8s.io/kubectl v0.0.0
k8s.io/kubernetes v1.17.2
Expand Down
130 changes: 130 additions & 0 deletions test/e2e/multicast.go
@@ -0,0 +1,130 @@
package e2e

import (
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
)

const (
mcastSource = "pod-client"
mcastServer1 = "pod-server1"
mcastServer2 = "pod-server2"
)

var _ = ginkgo.Describe("Multicast", func() {

fr := framework.NewDefaultFramework("multicast")

type nodeInfo struct {
name string
nodeIP string
}

var (
cs clientset.Interface
ns string
clientNodeInfo, serverNodeInfo nodeInfo
)

ginkgo.BeforeEach(func() {
cs = fr.ClientSet
ns = fr.Namespace.Name

nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
framework.ExpectNoError(err)
if len(nodes.Items) < 2 {
framework.Skipf(
"Test requires >= 2 Ready nodes, but there are only %v nodes",
len(nodes.Items))
}

ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)

clientNodeInfo = nodeInfo{
name: nodes.Items[0].Name,
nodeIP: ips[0],
}

serverNodeInfo = nodeInfo{
name: nodes.Items[1].Name,
nodeIP: ips[1],
}

// annotate namespace to enable multicast
namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(ns, metav1.GetOptions{})
framework.ExpectNoError(err, "Error getting Namespace %v: %v", fr.Namespace.Name, err)
if namespace.ObjectMeta.Annotations == nil {
namespace.ObjectMeta.Annotations = map[string]string{}
}
namespace.ObjectMeta.Annotations["k8s.ovn.org/multicast-enabled"] = "true"
_, err = fr.ClientSet.CoreV1().Namespaces().Update(namespace)
framework.ExpectNoError(err, "Error updating Namespace %v: %v", ns, err)
})

ginkgo.It("should be able to send multicast UDP traffic between nodes", func() {
mcastGroup := "224.3.3.3"
mcastGroupBad := "224.5.5.5"
if IsIPv6Cluster(cs) {
mcastGroup = "ff3e::4321:1234"
mcastGroupBad = "ff3e::4321:1235"
}

// Start the multicast source (iperf client is the sender in multicast)
ginkgo.By("creating a pod as a multicast source in node " + clientNodeInfo.name)
// multicast group (-c 224.3.3.3), UDP (-u), TTL (-T 2), during (-t 3000) seconds, report every (-i 5) seconds
iperf := fmt.Sprintf("iperf -c %s -u -T 2 -t 3000 -i 5", mcastGroup)
if IsIPv6Cluster(cs) {
iperf = iperf + " -V"
}
cmd := []string{"/bin/sh", "-c", iperf}
clientPod := newAgnhostPod(mcastSource, cmd...)
clientPod.Spec.NodeName = clientNodeInfo.name
fr.PodClient().CreateSync(clientPod)

// Start a multicast listener on the same groups and verify it received the traffic (iperf server is the multicast listener)
// join multicast group (-B 224.3.3.3), UDP (-u), during (-t 30) seconds, report every (-i 1) seconds
ginkgo.By("creating first multicast listener pod in node " + serverNodeInfo.name)
iperf = fmt.Sprintf("iperf -s -B %s -u -t 30 -i 5", mcastGroup)
if IsIPv6Cluster(cs) {
iperf = iperf + " -V"
}
cmd = []string{"/bin/sh", "-c", iperf}
mcastServerPod1 := newAgnhostPod(mcastServer1, cmd...)
mcastServerPod1.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(mcastServerPod1)

// Start a multicast listener on on other group and verify it does not receive the traffic (iperf server is the multicast listener)
// join multicast group (-B 224.4.4.4), UDP (-u), during (-t 30) seconds, report every (-i 1) seconds
ginkgo.By("creating second multicast listener pod in node " + serverNodeInfo.name)
iperf = fmt.Sprintf("iperf -s -B %s -u -t 30 -i 5", mcastGroupBad)
if IsIPv6Cluster(cs) {
iperf = iperf + " -V"
}
cmd = []string{"/bin/sh", "-c", iperf}
mcastServerPod2 := newAgnhostPod(mcastServer2, cmd...)
mcastServerPod2.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(mcastServerPod2)

ginkgo.By("checking if pod server1 received multicast traffic")
gomega.Eventually(func() (string, error) {
return e2epod.GetPodLogs(cs, ns, mcastServer1, mcastServer1)
},
30*time.Second, 1*time.Second).Should(gomega.ContainSubstring("connected"))

ginkgo.By("checking if pod server2 does not received multicast traffic")
gomega.Eventually(func() (string, error) {
return e2epod.GetPodLogs(cs, ns, mcastServer2, mcastServer2)
},
30*time.Second, 1*time.Second).ShouldNot(gomega.ContainSubstring("connected"))
})

})
44 changes: 42 additions & 2 deletions test/e2e/utils_test.go → test/e2e/util.go
@@ -1,12 +1,52 @@
package e2e_test
package e2e

import (
"encoding/json"
"fmt"
utilnet "k8s.io/utils/net"
"net"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
utilnet "k8s.io/utils/net"
)

const agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.21"

// newAgnhostPod returns a pod that uses the agnhost image. The image's binary supports various subcommands
// that behave the same, no matter the underlying OS.
func newAgnhostPod(name string, command ...string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: name,
Image: agnhostImage,
Command: command,
},
},
RestartPolicy: v1.RestartPolicyNever,
},
}
}

// IsIPv6Cluster returns true if the kubernetes default service is IPv6
func IsIPv6Cluster(c clientset.Interface) bool {
// Get the ClusterIP of the kubernetes service created in the default namespace
svc, err := c.CoreV1().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{})
if err != nil {
framework.Failf("Failed to get kubernetes service ClusterIP: %v", err)
}
if utilnet.IsIPv6String(svc.Spec.ClusterIP) {
return true
}
return false
}

// PodAnnotation describes the assigned network details for a single pod network. (The
// actual annotation may include the equivalent of multiple PodAnnotations.)
type PodAnnotation struct {
Expand Down

0 comments on commit e6f2d17

Please sign in to comment.