Skip to content

Commit

Permalink
WIP: connectivity: add full egress gateway test suite
Browse files Browse the repository at this point in the history
Signed-off-by: Gilberto Bertin <jibi@cilium.io>
  • Loading branch information
jibi committed May 25, 2023
1 parent 3237c9b commit 8b8a856
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 1 deletion.
10 changes: 10 additions & 0 deletions connectivity/check/context.go
Expand Up @@ -302,6 +302,9 @@ func (ct *ConnectivityTest) SetupAndValidate(ctx context.Context) error {
if err := ct.validateDeployment(ctx); err != nil {
return err
}
if err := ct.patchEchoServicesWithExternalIPs(ctx); err != nil {
return err
}
if ct.params.Hubble {
if err := ct.enableHubbleClient(ctx); err != nil {
return fmt.Errorf("unable to create hubble client: %s", err)
Expand Down Expand Up @@ -760,6 +763,13 @@ func (ct *ConnectivityTest) PingCommand(peer TestPeer, ipFam IPFamily) []string
return cmd
}

func (ct *ConnectivityTest) DigCommand(peer TestPeer, ipFam IPFamily) []string {
cmd := []string{"dig", "+time=2", "kubernetes"}

cmd = append(cmd, fmt.Sprintf("@%s", peer.Address(ipFam)))
return cmd
}

func (ct *ConnectivityTest) RandomClientPod() *Pod {
for _, p := range ct.clientPods {
return &p
Expand Down
106 changes: 105 additions & 1 deletion connectivity/check/deployment.go
Expand Up @@ -17,9 +17,11 @@ import (
networkingv1 "k8s.io/api/networking/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/utils"
"github.com/cilium/cilium-cli/k8s"
)

Expand Down Expand Up @@ -236,6 +238,7 @@ type daemonSetParameters struct {
Labels map[string]string
HostNetwork bool
Tolerations []corev1.Toleration
Capabilities []corev1.Capability
}

func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet {
Expand Down Expand Up @@ -266,7 +269,7 @@ func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet {
ReadinessProbe: p.ReadinessProbe,
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"NET_RAW"},
Add: append([]corev1.Capability{"NET_RAW"}, p.Capabilities...),
},
},
},
Expand Down Expand Up @@ -814,6 +817,7 @@ func (ct *ConnectivityTest) deploy(ctx context.Context) error {
Tolerations: []corev1.Toleration{
{Operator: corev1.TolerationOpExists},
},
Capabilities: []corev1.Capability{"NET_ADMIN"},
})
_, err = ct.clients.src.CreateDaemonSet(ctx, ct.params.TestNamespace, ds, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -1202,6 +1206,106 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error {
return nil
}

// patchEchoServicesWithExternalIPs patches the echo services (echo-same-node
// and echo-other-node) external IPs to the IP of the node running those pods.
//
// This needs to happen after validate as we need to wait for the pods to be
// scheduled on the nodes first, in order to retrieve the pods' host IPs.

func (ct *ConnectivityTest) patchEchoServicesWithExternalIPs(ctx context.Context) error {
patchedServices := []*corev1.Service{}

echoSamePodHostIPs := []string{}
for _, client := range ct.clients.clients() {
echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-same-node"})
if err != nil {
return fmt.Errorf("unable to list echo pods: %w", err)
}

for _, echoPod := range echoPods.Items {
echoSamePodHostIPs = append(echoSamePodHostIPs, echoPod.Status.HostIP)
}

}

for _, client := range ct.clients.clients() {
patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoSamePodHostIPs, ","))

service, err := client.PatchService(ctx, ct.params.TestNamespace, echoSameNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return err
}

s := ct.echoServices[service.Name]
s.Service = service.DeepCopy()
ct.echoServices[service.Name] = s

patchedServices = append(patchedServices, service)
}

echoOtherPodHostIPs := []string{}
for _, client := range ct.clients.clients() {
echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-other-node"})
if err != nil {
return fmt.Errorf("unable to list echo pods: %w", err)
}

for _, echoPod := range echoPods.Items {
echoOtherPodHostIPs = append(echoOtherPodHostIPs, echoPod.Status.HostIP)
}
}

for _, client := range ct.clients.clients() {
patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoOtherPodHostIPs, ","))

service, err := client.PatchService(ctx, ct.params.TestNamespace, echoOtherNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return err
}

s := ct.echoServices[service.Name]
s.Service = service.DeepCopy()
ct.echoServices[service.Name] = s

patchedServices = append(patchedServices, service)
}

ensureFrontend := func() error {
for _, client := range ct.clients.clients() {
for _, ciliumPod := range ct.CiliumPods() {
for _, service := range patchedServices {
for _, ip := range service.Spec.ExternalIPs {
cmd := []string{"sh", "-c",
fmt.Sprintf("cilium bpf lb list --frontends | grep %s:%d", ip, service.Spec.Ports[0].Port)}

if out, err := client.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd); err != nil {
fmt.Println(out.String())
return err
}
}
}
}
}

return nil
}

w := utils.NewWaitObserver(ctx, utils.WaitParameters{Timeout: 10 * time.Second})
defer w.Cancel()
for {
if err := ensureFrontend(); err != nil {
if err := w.Retry(err); err != nil {
return fmt.Errorf("Failed to ensure external IPs are exposed: %w", err)
}

continue
}
break
}

return nil
}

// Validate that srcPod can query the DNS server on dstPod successfully
func (ct *ConnectivityTest) waitForPodDNS(ctx context.Context, srcPod, dstPod Pod) error {
ct.Logf("⌛ [%s] Waiting for pod %s to reach DNS server on %s pod...", ct.client.ClusterName(), srcPod.Name(), dstPod.Name())
Expand Down
40 changes: 40 additions & 0 deletions connectivity/check/peer.go
Expand Up @@ -199,6 +199,46 @@ func (s Service) Labels() map[string]string {
return newMap
}

func (s Service) ToExternalIPService() ExternalIPService {
return ExternalIPService{
Service: s,
}
}

// ExternalIPService TODO
// It implements interface TestPeer.
type ExternalIPService struct {
Service Service
}

func (s ExternalIPService) Name() string {
return s.Service.Name()
}

func (s ExternalIPService) Scheme() string {
return s.Service.Scheme()
}

func (s ExternalIPService) Path() string {
return s.Service.Path()
}

func (s ExternalIPService) Address(family IPFamily) string {

Check warning on line 226 in connectivity/check/peer.go

View workflow job for this annotation

GitHub Actions / build

unused-parameter: parameter 'family' seems to be unused, consider removing or renaming it as _ (revive)
return s.Service.Service.Spec.ExternalIPs[0]
}

func (s ExternalIPService) Port() uint32 {
return uint32(s.Service.Service.Spec.Ports[0].NodePort)
}

func (s ExternalIPService) HasLabel(name, value string) bool {
return s.Service.HasLabel(name, value)
}

func (s ExternalIPService) Labels() map[string]string {
return s.Service.Labels()
}

// ExternalWorkload is an external workload acting as a peer in a
// connectivity test. It implements interface TestPeer.
type ExternalWorkload struct {
Expand Down
16 changes: 16 additions & 0 deletions connectivity/manifests/egress-gateway-policy.yaml
Expand Up @@ -14,3 +14,19 @@ spec:
nodeSelector:
matchLabels:
kubernetes.io/hostname: NODE_NAME_PLACEHOLDER
---
apiVersion: cilium.io/v2
kind: CiliumEgressGatewayPolicy
metadata:
name: cegp-sample-echo-service
spec:
selectors:
- podSelector:
matchLabels:
kind: echo
destinationCIDRs:
- 0.0.0.0/0
egressGateway:
nodeSelector:
matchLabels:
kubernetes.io/hostname: NODE_NAME_PLACEHOLDER
101 changes: 101 additions & 0 deletions connectivity/tests/egressgateway.go
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/utils"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EgressGateway is a test case which, given the cegp-sample
Expand Down Expand Up @@ -48,10 +50,37 @@ func (s *egressGateway) Run(ctx context.Context, t *check.Test) {

s.waitForBpfPolicyEntries(ctx, t)

// Ping hosts (pod to host connectivity)
i := 0
for _, client := range ct.ClientPods() {
client := client

for _, dst := range ct.HostNetNSPodsByNode() {
t.NewAction(s, fmt.Sprintf("ping-%d", i), &client, &dst, check.IPFamilyV4).Run(func(a *check.Action) {

Check failure on line 59 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
a.ExecInPod(ctx, ct.PingCommand(dst, check.IPFamilyV4))
})
i++
}
}

// DNS query (pod to service connectivity)
i = 0
for _, client := range ct.ClientPods() {
kubeDnsService, err := ct.K8sClient().GetService(ctx, "kube-system", "kube-dns", metav1.GetOptions{})

Check warning on line 69 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

var-naming: var kubeDnsService should be kubeDNSService (revive)
if err != nil {
t.Fatal("Cannot get kube-dns service")
}
kubeDnsServicePeer := check.Service{Service: kubeDnsService}

Check warning on line 73 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

var-naming: var kubeDnsServicePeer should be kubeDNSServicePeer (revive)

t.NewAction(s, fmt.Sprintf("dig-%d", i), &client, kubeDnsServicePeer, check.IPFamilyV4).Run(func(a *check.Action) {

Check failure on line 75 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
a.ExecInPod(ctx, ct.DigCommand(kubeDnsServicePeer, check.IPFamilyV4))
})
i++
}

// Traffic matching an egress gateway policy should leave the cluster masqueraded with the egress IP (pod to external service)
i = 0
for _, client := range ct.ClientPods() {
for _, externalEcho := range ct.ExternalEchoPods() {
t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, externalEcho, check.IPFamilyV4).Run(func(a *check.Action) {

Check failure on line 85 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
a.ExecInPod(ctx, ct.CurlClientIPCommand(externalEcho, check.IPFamilyV4))
Expand All @@ -64,6 +93,68 @@ func (s *egressGateway) Run(ctx context.Context, t *check.Test) {
i++
}
}

// When connecting from outside the cluster to a nodeport service whose pods are selected by an egress policy,
// the reply traffic should not be SNATed with the egress IP
i = 0
for _, client := range ct.ExternalEchoPods() {
for _, echo := range ct.EchoServices() {
// convert the service to a ServiceExternalIP as we want to access it through its external IP
echo := echo.ToExternalIPService()

t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, echo, check.IPFamilyV4).Run(func(a *check.Action) {
a.ExecInPod(ctx, ct.CurlClientIPCommand(echo, check.IPFamilyV4))
})
i++
}
}

if status, ok := ct.Feature(check.FeatureTunnel); ok && status.Enabled == false {

Check failure on line 112 in connectivity/tests/egressgateway.go

View workflow job for this annotation

GitHub Actions / build

S1002: should omit comparison to bool constant, can be simplified to `!status.Enabled` (gosimple)
// When connecting from outside the cluster directly to a pod which is selected by an egress policy, the
// reply traffic should not be SNATed with the egress IP (only connections originating from these pods
// should go through egress gateway).
//
// This test is executed only when Cilium is running in direct routing mode, since we can simply add a
// route on the node outside the cluster to direct pod's traffic to the node where the pod is running
// (while in tunneling mode we would need the external node to send the traffic over the tunnel)

for _, echoPod := range ct.EchoPods() {
targetPodHostIP := echoPod.Pod.Status.HostIP
targetPodIP := echoPod.Pod.Status.PodIP

for _, externalNode := range ct.NodesWithoutCilium() {
for node, hostNetNSPod := range ct.HostNetNSPodsByNode() {
if node != externalNode {
continue
}

cmd := []string{"ip", "route", "add", targetPodIP, "via", targetPodHostIP}
_, err := hostNetNSPod.K8sClient.ExecInPod(ctx, hostNetNSPod.Pod.Namespace, hostNetNSPod.Pod.Name, "", cmd)
if err != nil {
t.Fatalf("failed to add ip route: %w", err)
}

defer func(hostNetNSPod check.Pod) {
cmd = []string{"ip", "route", "del", targetPodIP, "via", targetPodHostIP}
_, err = hostNetNSPod.K8sClient.ExecInPod(ctx, hostNetNSPod.Pod.Namespace, hostNetNSPod.Pod.Name, "", cmd)
if err != nil {
t.Fatalf("failed to delete ip route: %w", err)
}
}(hostNetNSPod)
}
}
}

i = 0
for _, client := range ct.ExternalEchoPods() {
for _, echo := range ct.EchoPods() {
t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, echo, check.IPFamilyV4).Run(func(a *check.Action) {
a.ExecInPod(ctx, ct.CurlClientIPCommand(echo, check.IPFamilyV4))
})
i++
}
}
}
}

// getGatewayNodeInternalIP returns the k8s internal IP of the node acting as
Expand Down Expand Up @@ -139,6 +230,16 @@ func (s *egressGateway) waitForBpfPolicyEntries(ctx context.Context, t *check.Te
})
}

for _, echo := range ct.EchoPods() {
targetEntries = append(targetEntries,
bpfEgressGatewayPolicyEntry{
SourceIP: echo.Pod.Status.PodIP,
DestCIDR: "0.0.0.0/0",
EgressIP: egressIP,
GatewayIP: gatewayNodeInternalIP.String(),
})
}

cmd := strings.Split("cilium bpf egress list -o json", " ")
stdout, err := ciliumPod.K8sClient.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions k8s/client.go
Expand Up @@ -253,6 +253,10 @@ func (c *Client) GetService(ctx context.Context, namespace, name string, opts me
return c.Clientset.CoreV1().Services(namespace).Get(ctx, name, opts)
}

func (c *Client) PatchService(ctx context.Context, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*corev1.Service, error) {
return c.Clientset.CoreV1().Services(namespace).Patch(ctx, name, pt, data, opts)
}

func (c *Client) CreateEndpoints(ctx context.Context, namespace string, ep *corev1.Endpoints, opts metav1.CreateOptions) (*corev1.Endpoints, error) {
return c.Clientset.CoreV1().Endpoints(namespace).Create(ctx, ep, opts)
}
Expand Down

0 comments on commit 8b8a856

Please sign in to comment.