Skip to content

Commit

Permalink
test/k8s: make kafka tests more reliable
Browse files Browse the repository at this point in the history
Waiting until the Kafka command succeeds will prevent any flakiness
in the tests.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Jun 13, 2023
1 parent e2555a0 commit 66ad757
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions test/k8s/kafka_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,29 +149,30 @@ var _ = SkipDescribeIf(helpers.RunsOn54Kernel, "K8sKafkaPolicyTest", func() {
// We need to produce first, since consumer script waits for
// some messages to be already there by the producer.

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[empireHqApp], prodHqAnnounce)
Expect(err).Should(BeNil(), "Failed to produce from empire-hq on topic empire-announce")
waitUntilSucceed := func(pod, arg, description string) {
var errBody error
body := func() bool {
errBody = kubectl.ExecKafkaPodCmd(helpers.DefaultNamespace, pod, arg)
if errBody != nil {
return false
}
return true
}
err = helpers.WithTimeout(body, description, &helpers.TimeoutConfig{Timeout: 60 * time.Second})

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], conOutpostAnnoune)
Expect(err).Should(BeNil(), "Failed to consume from outpost on topic empire-announce")
ExpectWithOffset(1, err).Should(BeNil(), fmt.Sprintf("%s: %s", description, errBody))
}
waitUntilSucceed(appPods[empireHqApp], prodHqAnnounce, "Failed to produce from empire-hq on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[empireHqApp], prodHqDeathStar)
Expect(err).Should(BeNil(), "Failed to produce from empire-hq on topic deathstar-plans")
waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], conOutDeathStar)
Expect(err).Should(BeNil(), "Failed to consume from outpost on topic deathstar-plans")
waitUntilSucceed(appPods[empireHqApp], prodHqDeathStar, "Failed to produce from empire-hq on topic deathstar-plans")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[backupApp], prodBackAnnounce)
Expect(err).Should(BeNil(), "Failed to produce to backup on topic empire-announce")
waitUntilSucceed(appPods[outpostApp], conOutDeathStar, "Failed to consume from outpost on topic deathstar-plans")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], prodOutAnnounce)
Expect(err).Should(BeNil(), "Failed to produce to outpost on topic empire-announce")
waitUntilSucceed(appPods[backupApp], prodBackAnnounce, "Failed to produce to backup on topic empire-announce")

waitUntilSucceed(appPods[outpostApp], prodOutAnnounce, "Failed to produce to outpost on topic empire-announce")

By("Apply L7 kafka policy and wait")

Expand All @@ -181,21 +182,14 @@ var _ = SkipDescribeIf(helpers.RunsOn54Kernel, "K8sKafkaPolicyTest", func() {
Expect(err).To(BeNil(), "L7 policy cannot be imported correctly")

By("Testing Kafka L7 policy enforcement status")
err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[empireHqApp], prodHqAnnounce)
Expect(err).Should(BeNil(), "Failed to produce from empire-hq on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], conOutpostAnnoune)
Expect(err).Should(BeNil(), "Failed to consume from outpost on topic empire-announce")
waitUntilSucceed(appPods[empireHqApp], prodHqAnnounce, "Failed to produce from empire-hq on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[empireHqApp], prodHqDeathStar)
Expect(err).Should(BeNil(), "Failed to produce from empire-hq on topic deathstar-plans")
waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[outpostApp], conOutpostAnnoune)
Expect(err).Should(BeNil(), "Failed to consume from outpost on topic empire-announce")
waitUntilSucceed(appPods[empireHqApp], prodHqDeathStar, "Failed to produce from empire-hq on topic deathstar-plans")

waitUntilSucceed(appPods[outpostApp], conOutpostAnnoune, "Failed to consume from outpost on topic empire-announce")

err = kubectl.ExecKafkaPodCmd(
helpers.DefaultNamespace, appPods[backupApp], prodBackAnnounce)
Expand Down

0 comments on commit 66ad757

Please sign in to comment.