Skip to content

Commit

Permalink
Fix bundle_timeout issue caused by OVS PacketIn channel
Browse files Browse the repository at this point in the history
SubscribePacketIn requires a receiver to pop data in ch timely, otherwise
it will block all inbound messages from OVS.
  • Loading branch information
gran-vmv committed Jul 14, 2020
1 parent 60fc26f commit e4068c2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ type Client interface {
// the old priority with the desired one, for each priority update.
ReassignFlowPriorities(updates map[uint16]uint16) error

// SubscribePacketIn subscribes packet-in channel in Bridge.
// SubscribePacketIn subscribes packet-in channel in Bridge. This method requires a receiver to
// pop data in ch timely, otherwise it will block all inbound messages from OVS.
SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error

// SendTraceflowPacket injects packet to specified OVS port for Openflow.
Expand Down
26 changes: 24 additions & 2 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/contiv/ofnet/ofctrl"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

Expand Down Expand Up @@ -50,16 +51,37 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) {
err := c.SubscribePacketIn(uint8(ofprAction), ch)
if err != nil {
klog.Errorf("Subscribe PacketIn failed %+v", err)
return
}
packetInQueue := workqueue.NewNamed("packetIn")
go c.parsePacketIn(packetInQueue, stopCh)

wait.PollUntil(time.Second, func() (done bool, err error) {
wait.PollUntil(time.Millisecond, func() (done bool, err error) {
pktIn := <-ch
packetInQueue.Add(pktIn)
return false, nil
}, stopCh)
packetInQueue.ShutDown()
}

func (c *client) parsePacketIn(packetInQueue workqueue.Interface, stopCh <-chan struct{}) {
wait.PollUntil(time.Millisecond, func() (done bool, err error) {
obj, quit := packetInQueue.Get()
if quit {
return true, nil
}
packetInQueue.Done(obj)
pktIn, ok := obj.(*ofctrl.PacketIn)
if !ok {
klog.Errorf("Invalid packet in data in queue, skipping.")
return false, nil
}
for name, handler := range c.packetInHandlers {
err = handler.HandlePacketIn(pktIn)
if err != nil {
klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err)
}
}
return false, err
return false, nil
}, stopCh)
}

0 comments on commit e4068c2

Please sign in to comment.