Skip to content

Commit

Permalink
Fix OVS "flow" replay for groups (antrea-io#2134)
Browse files Browse the repository at this point in the history
The Group objects were not reset correctly when attempting to replay
them, leading to confusing error log messages and invalid datapath
state. We fix the implementation of Reset() for groups and we ensure
that the method is called during replay.

We also update the TestOVSFlowReplay e2e test to make sure it is more
comprehensive: instead of just checking Pod-to-Pod connectivity after a
replay, we ensure that the number of OVS flows / groups is the same
before and after a restart / replay. We confirmed that the updated test
fails when the patch is not applied.

Fixes antrea-io#2127
  • Loading branch information
antoninbas committed May 1, 2021
1 parent d832229 commit 5f8aea1
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 9 deletions.
6 changes: 6 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interva
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
// described above, We guarantee that at most two rounds of flows exist in the switch at any given
// time.
// Note that at the moment we assume that all OpenFlow groups are deleted every time there is an
// Antrea Agent restart. This allows us to add the necessary groups without having to worry about
// the operation failing because a (stale) group with the same ID already exists in OVS. This
// assumption is currently guaranteed by the ofnet implementation:
// https://github.com/wenyingd/ofnet/blob/14a78b27ef8762e45a0cfc858c4d07a4572a99d5/ofctrl/fgraphSwitch.go#L57-L62
// All previous groups have been deleted by the time the call to i.ofClient.Initialize returns.
func (i *Initializer) initOpenFlowPipeline() error {
roundInfo := getRoundInfo(i.ovsBridgeClient)
gateway, ok := i.ifaceStore.GetInterface(i.hostGateway)
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,10 @@ func (c *client) ReplayFlows() {
return true
}

c.groupCache.Range(func(id, gEntry interface{}) bool {
if err := gEntry.(binding.Group).Add(); err != nil {
c.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
group.Reset()
if err := group.Add(); err != nil {
klog.Errorf("Error when replaying cached group %d: %v", id, err)
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type OFBridge struct {

func (b *OFBridge) CreateGroup(id GroupIDType) Group {
ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect)
if err != nil {
if err != nil { // group already exists
ofctrlGroup = b.ofSwitch.GetGroup(uint32(id))
}
g := &ofGroup{bridge: b, ofctrl: ofctrlGroup}
Expand Down
12 changes: 11 additions & 1 deletion pkg/ovs/openflow/ofctrl_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ type ofGroup struct {
bridge *OFBridge
}

// Reset creates a new ofctrl.Group object for the updated ofSwitch. The
// ofSwitch keeps a list of all Group objects, so this operation is
// needed. Reset() should be called before replaying the Group to OVS.
func (g *ofGroup) Reset() {
g.ofctrl.Switch = g.bridge.ofSwitch
// An error ("group already exists") is not possible here since we are
// using a new instance of ofSwitch and re-creating a group which was
// created successfully before. There will be no duplicate group IDs. If
// something is wrong and there is an error, g.ofctrl will be set to nil
// and the Agent will crash later.
newGroup, _ := g.bridge.ofSwitch.NewGroup(g.ofctrl.ID, g.ofctrl.GroupType)
newGroup.Buckets = g.ofctrl.Buckets
g.ofctrl = newGroup
}

func (g *ofGroup) Add() error {
Expand Down
46 changes: 41 additions & 5 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -326,8 +327,9 @@ func TestOVSRestartSameNode(t *testing.T) {
}

// TestOVSFlowReplay checks that when OVS restarts unexpectedly the Antrea agent takes care of
// replaying flows. More precisely this test checks that Pod connectivity still works after deleting
// the flows and force-restarting the OVS dameons.
// replaying flows. More precisely this test checks that we have the same number of flows and groups
// after deleting them and force-restarting the OVS daemons. We also make sure that Pod connectivity
// still works.
func TestOVSFlowReplay(t *testing.T) {
skipIfProviderIs(t, "kind", "stopping OVS daemons create connectivity issues")
data, err := setupTest(t)
Expand Down Expand Up @@ -359,15 +361,45 @@ func TestOVSFlowReplay(t *testing.T) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

t.Logf("Deleting flows and restarting OVS daemons on Node '%s'", workerNode)
delFlows := func() {
countFlows := func() int {
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}
countGroups := func() int {
cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping groups: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d group in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}

numFlows1, numGroups1 := countFlows(), countGroups()

// This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the
// bridge. An alternative may be to kill the antrea-ovs container running on that Node.
t.Logf("Deleting flows / groups and restarting OVS daemons on Node '%s'", workerNode)
delFlowsAndGroups := func() {
cmd := []string{"ovs-ofctl", "del-flows", defaultBridgeName}
_, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting flows: <%v>, err: <%v>", stderr, err)
}
cmd = []string{"ovs-ofctl", "del-groups", defaultBridgeName}
_, stderr, err = data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting groups: <%v>, err: <%v>", stderr, err)
}
}
delFlows()
delFlowsAndGroups()
restartCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "--system-id=random", "restart", "--db-file=/var/run/openvswitch/conf.db"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, restartCmd); err != nil {
t.Fatalf("Error when restarting OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
Expand All @@ -377,6 +409,10 @@ func TestOVSFlowReplay(t *testing.T) {
// interval.
t.Logf("Running second ping mesh to check that flows have been restored")
data.runPingMesh(t, podNames)

numFlows2, numGroups2 := countFlows(), countGroups()
assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay")
assert.Equal(t, numGroups1, numGroups2, "Mismatch in OVS group count after flow replay")
}

// TestPingLargeMTU verifies that fragmented ICMP packets are handled correctly. Until OVS 2.12.0,
Expand Down

0 comments on commit 5f8aea1

Please sign in to comment.