Skip to content

Commit

Permalink
Fix bundle_timeout issue caused by OVS PacketIn channel
Browse files Browse the repository at this point in the history
SubscribePacketIn requires a receiver to pop data in ch timely, otherwise
it will block all inbound messages from OVS.
  • Loading branch information
gran-vmv committed Jul 16, 2020
1 parent 41370d3 commit 04fb075
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Expand Up @@ -176,7 +176,8 @@ type Client interface {
// the old priority with the desired one, for each priority update.
ReassignFlowPriorities(updates map[uint16]uint16) error

// SubscribePacketIn subscribes packet-in channel in Bridge.
// SubscribePacketIn subscribes packet-in channel in bridge. This method requires a receiver to
// pop data from "ch" timely, otherwise it will block all inbound messages from OVS.
SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error

// SendTraceflowPacket injects packet to specified OVS port for Openflow.
Expand Down
44 changes: 36 additions & 8 deletions pkg/agent/openflow/packetin.go
Expand Up @@ -15,10 +15,8 @@
package openflow

import (
"time"

"github.com/contiv/ofnet/ofctrl"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

Expand All @@ -31,6 +29,8 @@ type PacketInHandler interface {
const (
// Action explicitly output to controller.
ofprAction ofpPacketInReason = 1
// Max packetInQueue size.
packetInQueueSize int = 256
)

func (c *client) RegisterPacketInHandler(packetHandlerName string, packetInHandler interface{}) {
Expand All @@ -50,16 +50,44 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {
err := c.SubscribePacketIn(uint8(ofprAction), ch)
if err != nil {
klog.Errorf("Subscribe PacketIn failed %+v", err)
return
}
packetInQueue := workqueue.NewNamed("packetIn")
go c.parsePacketIn(packetInQueue, stopCh)

for {
select {
case pktIn := <-ch:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if packetInQueue.Len() < packetInQueueSize {
packetInQueue.Add(pktIn)
} else {
klog.Warningf("Max packetInQueue size exceeded.")
}
case <-stopCh:
packetInQueue.ShutDown()
break
}
}
}

wait.PollUntil(time.Second, func() (done bool, err error) {
pktIn := <-ch
func (c *client) parsePacketIn(packetInQueue workqueue.Interface, stopCh <-chan struct{}) {
for {
obj, quit := packetInQueue.Get()
if quit {
break
}
packetInQueue.Done(obj)
pktIn, ok := obj.(*ofctrl.PacketIn)
if !ok {
klog.Errorf("Invalid packet in data in queue, skipping.")
continue
}
for name, handler := range c.packetInHandlers {
err = handler.HandlePacketIn(pktIn)
err := handler.HandlePacketIn(pktIn)
if err != nil {
klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err)
}
}
return false, err
}, stopCh)
}
}

0 comments on commit 04fb075

Please sign in to comment.