Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OVS "flow" replay for groups #2134

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interva
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
// described above, We guarantee that at most two rounds of flows exist in the switch at any given
// time.
// Note that at the moment we assume that all OpenFlow groups are deleted every time there is an
// Antrea Agent restart. This allows us to add the necessary groups without having to worry about
// the operation failing because a (stale) group with the same ID already exists in OVS. This
// assumption is currently guaranteed by the ofnet implementation:
// https://github.com/wenyingd/ofnet/blob/14a78b27ef8762e45a0cfc858c4d07a4572a99d5/ofctrl/fgraphSwitch.go#L57-L62
// All previous groups have been deleted by the time the call to i.ofClient.Initialize returns.
func (i *Initializer) initOpenFlowPipeline() error {
roundInfo := getRoundInfo(i.ovsBridgeClient)

Expand Down Expand Up @@ -427,7 +433,7 @@ func (i *Initializer) FlowRestoreComplete() error {
if err != nil {
if err == wait.ErrWaitTimeout {
// This could happen if the method is triggered by OVS disconnection event, in which OVS doesn't restart.
klog.Info("flow-restore-wait was not true, skip cleaning up it")
klog.Info("flow-restore-wait was not true, skip cleaning it up")
return nil
}
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,10 @@ func (c *client) ReplayFlows() {
return true
}

c.groupCache.Range(func(id, gEntry interface{}) bool {
if err := gEntry.(binding.Group).Add(); err != nil {
c.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
group.Reset()
if err := group.Add(); err != nil {
klog.Errorf("Error when replaying cached group %d: %v", id, err)
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type OFBridge struct {

func (b *OFBridge) CreateGroup(id GroupIDType) Group {
ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect)
if err != nil {
if err != nil { // group already exists
ofctrlGroup = b.ofSwitch.GetGroup(uint32(id))
}
g := &ofGroup{bridge: b, ofctrl: ofctrlGroup}
Expand Down
12 changes: 11 additions & 1 deletion pkg/ovs/openflow/ofctrl_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ type ofGroup struct {
bridge *OFBridge
}

// Reset creates a new ofctrl.Group object for the updated ofSwitch. The
// ofSwitch keeps a list of all Group objects, so this operation is
// needed. Reset() should be called before replaying the Group to OVS.
func (g *ofGroup) Reset() {
g.ofctrl.Switch = g.bridge.ofSwitch
// An error ("group already exists") is not possible here since we are
// using a new instance of ofSwitch and re-creating a group which was
// created successfully before. There will be no duplicate group IDs. If
// something is wrong and there is an error, g.ofctrl will be set to nil
// and the Agent will crash later.
newGroup, _ := g.bridge.ofSwitch.NewGroup(g.ofctrl.ID, g.ofctrl.GroupType)
newGroup.Buckets = g.ofctrl.Buckets
g.ofctrl = newGroup
}

func (g *ofGroup) Add() error {
Expand Down
46 changes: 41 additions & 5 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -327,8 +328,9 @@ func TestOVSRestartSameNode(t *testing.T) {
}

// TestOVSFlowReplay checks that when OVS restarts unexpectedly the Antrea agent takes care of
// replaying flows. More precisely this test checks that Pod connectivity still works after deleting
// the flows and force-restarting the OVS dameons.
// replaying flows. More precisely this test checks that we have the same number of flows and groups
// after deleting them and force-restarting the OVS daemons. We also make sure that Pod connectivity
// still works.
func TestOVSFlowReplay(t *testing.T) {
skipIfProviderIs(t, "kind", "stopping OVS daemons create connectivity issues")
data, err := setupTest(t)
Expand Down Expand Up @@ -360,15 +362,45 @@ func TestOVSFlowReplay(t *testing.T) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

t.Logf("Deleting flows and restarting OVS daemons on Node '%s'", workerNode)
delFlows := func() {
countFlows := func() int {
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}
countGroups := func() int {
cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping groups: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d group in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}

numFlows1, numGroups1 := countFlows(), countGroups()

// This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the
// bridge. An alternative may be to kill the antrea-ovs container running on that Node.
t.Logf("Deleting flows / groups and restarting OVS daemons on Node '%s'", workerNode)
delFlowsAndGroups := func() {
cmd := []string{"ovs-ofctl", "del-flows", defaultBridgeName}
_, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting flows: <%v>, err: <%v>", stderr, err)
}
cmd = []string{"ovs-ofctl", "del-groups", defaultBridgeName}
_, stderr, err = data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting groups: <%v>, err: <%v>", stderr, err)
}
}
delFlows()
delFlowsAndGroups()
restartCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "--system-id=random", "restart", "--db-file=/var/run/openvswitch/conf.db"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, restartCmd); err != nil {
t.Fatalf("Error when restarting OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
Expand All @@ -378,6 +410,10 @@ func TestOVSFlowReplay(t *testing.T) {
// interval.
t.Logf("Running second ping mesh to check that flows have been restored")
data.runPingMesh(t, podNames)

numFlows2, numGroups2 := countFlows(), countGroups()
assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay")
assert.Equal(t, numGroups1, numGroups2, "Mismatch in OVS group count after flow replay")
}

// TestPingLargeMTU verifies that fragmented ICMP packets are handled correctly. Until OVS 2.12.0,
Expand Down