-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
kafka_policies.go
198 lines (150 loc) · 8.16 KB
/
kafka_policies.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package k8sTest
import (
"fmt"
"time"
. "github.com/onsi/gomega"
. "github.com/cilium/cilium/test/ginkgo-ext"
"github.com/cilium/cilium/test/helpers"
)
// The 5.4 CI job is intended to catch BPF complexity regressions and as such
// doesn't need to execute this test suite.
var _ = SkipDescribeIf(helpers.RunsOn54Kernel, "K8sKafkaPolicyTest", func() {
var (
kubectl *helpers.Kubectl
// these two are set in BeforeAll
l7Policy string
demoPath string
ciliumFilename string
kafkaApp = "kafka"
backupApp = "empire-backup"
empireHqApp = "empire-hq"
outpostApp = "empire-outpost"
apps = []string{kafkaApp, backupApp, empireHqApp, outpostApp}
appPods = map[string]string{}
topicEmpireAnnounce = "empire-announce"
topicDeathstarPlans = "deathstar-plans"
topicTest = "test-topic"
prodHqAnnounce = `sh -c "echo 'Happy 40th Birthday to General Tagge' | ./kafka-produce.sh --topic empire-announce"`
conOutpostAnnoune = `sh -c "./kafka-consume.sh --topic empire-announce --from-beginning --max-messages 1"`
prodHqDeathStar = `sh -c "echo 'deathstar reactor design v3' | ./kafka-produce.sh --topic deathstar-plans"`
conOutDeathStar = `sh -c "./kafka-consume.sh --topic deathstar-plans --from-beginning --max-messages 1"`
prodBackAnnounce = `sh -c "echo 'Happy 40th Birthday to General Tagge' | ./kafka-produce.sh --topic empire-announce"`
prodOutAnnounce = `sh -c "echo 'Vader Booed at Empire Karaoke Party' | ./kafka-produce.sh --topic empire-announce"`
)
AfterFailed(func() {
kubectl.CiliumReport("cilium-dbg service list", "cilium-dbg endpoint list")
})
AfterAll(func() {
UninstallCiliumFromManifest(kubectl, ciliumFilename)
kubectl.CloseSSHClient()
})
// Tests involving the L7 proxy do not work when built with -race, see issue #13757.
SkipContextIf(func() bool { return helpers.SkipRaceDetectorEnabled() || helpers.RunsOnAKS() }, "Kafka Policy Tests", func() {
createTopicCmd := func(topic string) string {
return fmt.Sprintf("/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create "+
"--zookeeper localhost:2181 --replication-factor 1 "+
"--partitions 1 --topic %s", topic)
}
createTopic := func(topic string, pod string) error {
return kubectl.ExecKafkaPodCmd(helpers.DefaultNamespace, pod, createTopicCmd(topic))
}
// WaitKafkaBroker waits for the broker to be ready, by executing
// a command repeatedly until it succeeds, or a timeout occurs
waitForKafkaBroker := func(pod string, cmd string) error {
body := func() bool {
err := kubectl.ExecKafkaPodCmd(helpers.DefaultNamespace, pod, cmd)
return err == nil
}
err := helpers.WithTimeout(body, "Kafka Broker not ready", &helpers.TimeoutConfig{Timeout: helpers.HelperTimeout})
return err
}
waitForDNSResolution := func(pod, service string) error {
body := func() bool {
dnsLookupCmd := fmt.Sprintf("nslookup %s", service)
res := kubectl.ExecPodCmd(helpers.DefaultNamespace, pod, dnsLookupCmd)
return res.WasSuccessful()
}
err := helpers.WithTimeout(body, fmt.Sprintf("unable to resolve DNS for service %s in pod %s", service, pod), &helpers.TimeoutConfig{Timeout: 240 * time.Second})
return err
}
JustAfterEach(func() {
kubectl.ValidateNoErrorsInLogs(CurrentGinkgoTestDescription().Duration)
})
AfterEach(func() {
// On aftereach don't make assertions to delete all.
_ = kubectl.Delete(demoPath)
_ = kubectl.Delete(l7Policy)
ExpectAllPodsTerminated(kubectl)
})
BeforeAll(func() {
kubectl = helpers.CreateKubectl(helpers.K8s1VMName(), logger)
l7Policy = helpers.ManifestGet(kubectl.BasePath(), "kafka-sw-security-policy.yaml")
demoPath = helpers.ManifestGet(kubectl.BasePath(), "kafka-sw-app.yaml")
ciliumFilename = helpers.TimestampFilename("cilium.yaml")
DeployCiliumAndDNS(kubectl, ciliumFilename)
kubectl.ApplyDefault(demoPath)
err := kubectl.WaitforPods(helpers.DefaultNamespace, "-l zgroup=kafkaTestApp", helpers.HelperTimeout)
Expect(err).Should(BeNil(), "Kafka Pods are not ready after timeout")
err = kubectl.WaitForKubeDNSEntry("kafka-service", helpers.DefaultNamespace)
Expect(err).To(BeNil(), "DNS entry of kafka-service is not ready after timeout")
err = kubectl.CiliumEndpointWaitReady()
Expect(err).To(BeNil(), "Endpoints are not ready after timeout")
appPods = helpers.GetAppPods(apps, helpers.DefaultNamespace, kubectl, "app")
By("Wait for Kafka broker to be up")
err = waitForKafkaBroker(appPods[kafkaApp], createTopicCmd(topicTest))
Expect(err).To(BeNil(), "Timeout: Kafka cluster failed to come up correctly")
})
It("KafkaPolicies", func() {
By("Creating new kafka topic %s", topicEmpireAnnounce)
err := createTopic(topicEmpireAnnounce, appPods[kafkaApp])
Expect(err).Should(BeNil(), "Failed to create topic empire-announce")
By("Creating new kafka topic %s", topicDeathstarPlans)
err = createTopic(topicDeathstarPlans, appPods[kafkaApp])
Expect(err).Should(BeNil(), "Failed to create topic deathstar-plans")
By("Waiting for DNS to resolve within pods for kafka-service")
err = waitForDNSResolution(appPods[empireHqApp], "kafka-service")
Expect(err).Should(BeNil(), "Failed to resolve kafka-service DNS entry in pod %s", appPods[empireHqApp])
err = waitForDNSResolution(appPods[outpostApp], "kafka-service")
Expect(err).Should(BeNil(), "Failed to resolve kafka-service DNS entry in pod %s", appPods[outpostApp])
By("Testing basic Kafka Produce and Consume")
// We need to produce first, since consumer script waits for
// some messages to be already there by the producer.
waitUntilSucceed := func(pod, arg, description string) {
var errBody error
body := func() bool {
errBody = kubectl.ExecKafkaPodCmd(helpers.DefaultNamespace, pod, arg)
return errBody == nil
}
err = helpers.WithTimeout(body, description, &helpers.TimeoutConfig{Timeout: 60 * time.Second})
ExpectWithOffset(1, err).Should(BeNil(), fmt.Sprintf("%s: %s", description, errBody))
}
waitUntilSucceed(appPods[empireHqApp], prodHqAnnounce, "Failed to produce from empire-hq on topic empire-announce")
waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")
waitUntilSucceed(appPods[empireHqApp], prodHqDeathStar, "Failed to produce from empire-hq on topic deathstar-plans")
waitUntilSucceed(appPods[outpostApp], conOutDeathStar, "Failed to consume from outpost on topic deathstar-plans")
waitUntilSucceed(appPods[backupApp], prodBackAnnounce, "Failed to produce to backup on topic empire-announce")
waitUntilSucceed(appPods[outpostApp], prodOutAnnounce, "Failed to produce to outpost on topic empire-announce")
By("Apply L7 kafka policy and wait")
_, err = kubectl.CiliumPolicyAction(
helpers.DefaultNamespace, l7Policy,
helpers.KubectlApply, helpers.HelperTimeout)
Expect(err).To(BeNil(), "L7 policy cannot be imported correctly")
By("Testing Kafka L7 policy enforcement status")
waitUntilSucceed(appPods[empireHqApp], prodHqAnnounce, "Failed to produce from empire-hq on topic empire-announce")
waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")
waitUntilSucceed(appPods[empireHqApp], prodHqDeathStar, "Failed to produce from empire-hq on topic deathstar-plans")
waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")
err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[backupApp], prodBackAnnounce)
Expect(err).Should(HaveOccurred(), "Produce from backup on topic empire-announce should have been denied by egress policy")
err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], conOutDeathStar)
Expect(err).Should(HaveOccurred(), "Consume from outpost on topic deathstar-plans should have been denied")
err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], prodOutAnnounce)
Expect(err).Should(HaveOccurred(), "Produce from outpost on topic empire-announce should have been denied")
})
})
})