Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OVS "flow" replay for groups #2134

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (i *Initializer) FlowRestoreComplete() error {
if err != nil {
if err == wait.ErrWaitTimeout {
// This could happen if the method is triggered by OVS disconnection event, in which OVS doesn't restart.
klog.Info("flow-restore-wait was not true, skip cleaning up it")
klog.Info("flow-restore-wait was not true, skip cleaning it up")
return nil
}
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,10 @@ func (c *client) ReplayFlows() {
return true
}

c.groupCache.Range(func(id, gEntry interface{}) bool {
if err := gEntry.(binding.Group).Add(); err != nil {
c.groupCache.Range(func(id, value interface{}) bool {
group := value.(binding.Group)
group.Reset()
if err := group.Add(); err != nil {
klog.Errorf("Error when replaying cached group %d: %v", id, err)
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type OFBridge struct {

func (b *OFBridge) CreateGroup(id GroupIDType) Group {
ofctrlGroup, err := b.ofSwitch.NewGroup(uint32(id), ofctrl.GroupSelect)
if err != nil {
if err != nil { // group already exists
ofctrlGroup = b.ofSwitch.GetGroup(uint32(id))
}
g := &ofGroup{bridge: b, ofctrl: ofctrlGroup}
Expand Down
11 changes: 10 additions & 1 deletion pkg/ovs/openflow/ofctrl_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,17 @@ type ofGroup struct {
bridge *OFBridge
}

// Reset creates a new ofctrl.Group object for the updated ofSwitch. The
// ofSwitch keeps a list of all Group objects, so this operation is
// needed. Reset() should be called before replaying the Group to OVS.
func (g *ofGroup) Reset() {
g.ofctrl.Switch = g.bridge.ofSwitch
// An error ("group already exists") is not possible here since the same
// group was created successfully before. If something is wrong and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment correct? It seems because the ofSwitch is a new instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify. I wanted to communicate 2 things 1) it is a new ofSwitch as you mention, but also 2) all the groups we are creating were created successfully before (previous ofSwitch instance) so their creation should succeed this time as well (no duplicate group IDs)

// there is an error, g.ofctrl will be set to nil and the Agent will
// crash later.
newGroup, _ := g.bridge.ofSwitch.NewGroup(g.ofctrl.ID, g.ofctrl.GroupType)
newGroup.Buckets = g.ofctrl.Buckets
g.ofctrl = newGroup
}

func (g *ofGroup) Add() error {
Expand Down
46 changes: 41 additions & 5 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -327,8 +328,9 @@ func TestOVSRestartSameNode(t *testing.T) {
}

// TestOVSFlowReplay checks that when OVS restarts unexpectedly the Antrea agent takes care of
// replaying flows. More precisely this test checks that Pod connectivity still works after deleting
// the flows and force-restarting the OVS dameons.
// replaying flows. More precisely this test checks that we have the same number of flows and groups
// after deleting them and force-restarting the OVS daemons. We also make sure that Pod connectivity
// still works.
func TestOVSFlowReplay(t *testing.T) {
skipIfProviderIs(t, "kind", "stopping OVS daemons create connectivity issues")
data, err := setupTest(t)
Expand Down Expand Up @@ -360,15 +362,45 @@ func TestOVSFlowReplay(t *testing.T) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

t.Logf("Deleting flows and restarting OVS daemons on Node '%s'", workerNode)
delFlows := func() {
countFlows := func() int {
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}
countGroups := func() int {
cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping groups: <%v>, err: <%v>", stderr, err)
}
count := strings.Count(stdout, "\n")
t.Logf("Counted %d group in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return count
}

numFlows1, numGroups1 := countFlows(), countGroups()

// This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the
// bridge. An alternative may be to kill the antrea-ovs container running on that Node.
t.Logf("Deleting flows / groups and restarting OVS daemons on Node '%s'", workerNode)
delFlowsAndGroups := func() {
cmd := []string{"ovs-ofctl", "del-flows", defaultBridgeName}
_, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting flows: <%v>, err: <%v>", stderr, err)
}
cmd = []string{"ovs-ofctl", "del-groups", defaultBridgeName}
_, stderr, err = data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when deleting groups: <%v>, err: <%v>", stderr, err)
}
}
delFlows()
delFlowsAndGroups()
restartCmd := []string{"/usr/share/openvswitch/scripts/ovs-ctl", "--system-id=random", "restart", "--db-file=/var/run/openvswitch/conf.db"}
if stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, restartCmd); err != nil {
t.Fatalf("Error when restarting OVS with ovs-ctl: %v - stdout: %s - stderr: %s", err, stdout, stderr)
Expand All @@ -378,6 +410,10 @@ func TestOVSFlowReplay(t *testing.T) {
// interval.
t.Logf("Running second ping mesh to check that flows have been restored")
data.runPingMesh(t, podNames)

numFlows2, numGroups2 := countFlows(), countGroups()
assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay")
assert.Equal(t, numGroups1, numGroups2, "Mismatch in OVS group count after flow replay")
}

// TestPingLargeMTU verifies that fragmented ICMP packets are handled correctly. Until OVS 2.12.0,
Expand Down