Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bundle_timeout issue caused by OVS PacketIn channel #951

Merged
merged 1 commit into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ type Client interface {
// the old priority with the desired one, for each priority update.
ReassignFlowPriorities(updates map[uint16]uint16) error

// SubscribePacketIn subscribes packet-in channel in Bridge.
// SubscribePacketIn subscribes packet-in channel in bridge. This method requires a receiver to
// pop data from "ch" timely, otherwise it will block all inbound messages from OVS.
SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error

// SendTraceflowPacket injects packet to specified OVS port for Openflow.
Expand Down
44 changes: 36 additions & 8 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
package openflow

import (
"time"

"github.com/contiv/ofnet/ofctrl"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

Expand All @@ -31,6 +29,8 @@ type PacketInHandler interface {
const (
// Action explicitly output to controller.
ofprAction ofpPacketInReason = 1
// Max packetInQueue size.
packetInQueueSize int = 256
)

func (c *client) RegisterPacketInHandler(packetHandlerName string, packetInHandler interface{}) {
Expand All @@ -50,16 +50,44 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {
err := c.SubscribePacketIn(uint8(ofprAction), ch)
if err != nil {
klog.Errorf("Subscribe PacketIn failed %+v", err)
return
}
packetInQueue := workqueue.NewNamed("packetIn")
gran-vmv marked this conversation as resolved.
Show resolved Hide resolved
go c.parsePacketIn(packetInQueue, stopCh)

for {
select {
case pktIn := <-ch:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if packetInQueue.Len() < packetInQueueSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a comment here. Especially because if someone refers to the K8s documentation (https://godoc.org/k8s.io/client-go/util/workqueue#Type.Len), they will see this:

Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

Now in our case I think it's ok because we just try to ensure that the queue doesn't grow too big, we are not interested in providing an exact guarantee. But a comment would help avoid confusion.

@tnqn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added comment here.

packetInQueue.Add(pktIn)
} else {
klog.Warningf("Max packetInQueue size exceeded.")
}
case <-stopCh:
packetInQueue.ShutDown()
break
}
}
}

wait.PollUntil(time.Second, func() (done bool, err error) {
pktIn := <-ch
func (c *client) parsePacketIn(packetInQueue workqueue.Interface, stopCh <-chan struct{}) {
for {
obj, quit := packetInQueue.Get()
if quit {
break
}
packetInQueue.Done(obj)
pktIn, ok := obj.(*ofctrl.PacketIn)
if !ok {
klog.Errorf("Invalid packet in data in queue, skipping.")
continue
}
for name, handler := range c.packetInHandlers {
err = handler.HandlePacketIn(pktIn)
err := handler.HandlePacketIn(pktIn)
if err != nil {
klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err)
}
}
return false, err
}, stopCh)
}
}