Skip to content

Commit

Permalink
eventqueue: Forcefully drain to prevent deadlock
Browse files Browse the repository at this point in the history
This commit ensures that the EventQueue is fully drained, even when its
not running its loop. When endpoints are being restored, their
EventQueue is initialized, but non-running state (processing events). It
is the job of the endpoint manager to kick off the event loop by calling
Expose() on the endpoint.

This commit fixes the following commits which causes Cilium to be stuck
waiting for the EventQueue to drain (WaitToBeDrained()):

290d9e9 ("daemon: Init endpoint queue during validation")
79bf425 ("endpoint: Add function to initialize event queue")

Cilium becoming stuck is described in the following flow:
  - Endpoints began restoration
  - Endpoint's EventQueue initialized (but never run)
  - Endpoint's metadata data resolver controller kicked off
  - Visilbity and bandwidth policy events enqueued
  - Endpoint fails restoration due to some issue (e.g. interface not
    found, etc)
  - Endpoint queued for deletion because it failed restoration
  - As part of endpoint deletion, the EventQueue is stopped and drained
  - Cilium deadlocks trying to drain, but the EventQueue run loop was
    never run, which would pop events off the `events` channel, and
    close the `eventsClosed` channel

This commit fixes this deadlock by forcefully running the event loop to
drain the queue. After the `events` channel is closed (from Stop()), the
loop will terminate and the `eventsClosed` channel will close, thereby
unblocking WaitToBeDrained().

Stacktrace from `gops`:

```
goroutine 632 [chan receive, 1 minutes]:
github.com/cilium/cilium/pkg/eventqueue.(*EventQueue).WaitToBeDrained(0xc00013c960)
        /go/src/github.com/cilium/cilium/pkg/eventqueue/eventqueue.go:322 +0x1ad
github.com/cilium/cilium/pkg/endpoint.(*Endpoint).Delete(0xc000ad6900, 0x27faee0, 0xc00062ac40, 0x27fba60, 0xc00099a120, 0x2877280, 0xc0005d2340, 0x430101, 0x0, 0x0, ...)
        /go/src/github.com/cilium/cilium/pkg/endpoint/endpoint.go:2194 +0x91
github.com/cilium/cilium/daemon/cmd.(*Daemon).deleteEndpointQuiet(...)
        /go/src/github.com/cilium/cilium/daemon/cmd/endpoint.go:674
github.com/cilium/cilium/daemon/cmd.(*Daemon).regenerateRestoredEndpoints.func2(0xc00062ac40, 0xc000b63214, 0xc000ad6900)
        /go/src/github.com/cilium/cilium/daemon/cmd/state.go:302 +0x7c
created by github.com/cilium/cilium/daemon/cmd.(*Daemon).regenerateRestoredEndpoints
        /go/src/github.com/cilium/cilium/daemon/cmd/state.go:296 +0x8a0
```

Signed-off-by: Chris Tarazi <chris@isovalent.com>
  • Loading branch information
christarazi authored and joestringer committed Oct 23, 2020
1 parent 253368a commit 1c0f00d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/eventqueue/eventqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type EventQueue struct {
// a time.
func NewEventQueue() *EventQueue {
return NewEventQueueBuffered("", 1)

}

// NewEventQueueBuffered returns an EventQueue with a capacity of,
Expand All @@ -97,7 +96,6 @@ func NewEventQueueBuffered(name string, numBufferedEvents int) *EventQueue {
drain: make(chan struct{}),
eventsClosed: make(chan struct{}),
}

}

// Enqueue pushes the given event onto the EventQueue. If the queue has been
Expand Down Expand Up @@ -235,12 +233,15 @@ func (ev *Event) printStats(q *EventQueue) {
// cancelled; any event which is currently being processed will not be
// cancelled.
func (q *EventQueue) Run() {

if q.notSafeToAccess() {
return
}

go q.eventQueueOnce.Do(func() {
go q.run()
}

func (q *EventQueue) run() {
q.eventQueueOnce.Do(func() {
defer close(q.eventsClosed)
for ev := range q.events {
select {
Expand Down Expand Up @@ -302,8 +303,11 @@ func (q *EventQueue) WaitToBeDrained() {
}
<-q.close

// In-flight events may still be running. Wait for them to be completed for
// the queue to be fully drained.
// If the queue is running, then in-flight events may still be ongoing.
// Wait for them to be completed for the queue to be fully drained. If the
// queue is not running, we must forcefully run it because nothing else
// will so that it can be drained.
go q.run()
<-q.eventsClosed
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/eventqueue/eventqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,27 @@ func (s *EventQueueSuite) TestEnqueueTwice(c *C) {
q.Stop()
q.WaitToBeDrained()
}

func (s *EventQueueSuite) TestForcefulDraining(c *C) {
// This will test enqueuing an event when the queue was never run and was
// stopped and drained. The behavior expected is that the event will
// successfully be enqueued (channel returned is non-nil & no error), and
// after the event is stopped and drained, the returned channel will
// unblock.

q := NewEventQueue()

ev := NewEvent(&DummyEvent{})
res, err := q.Enqueue(ev)
c.Assert(res, Not(IsNil))
c.Assert(err, IsNil)

q.Stop()
q.WaitToBeDrained()

select {
case <-res:
case <-time.After(5 * time.Second):
c.Fail()
}
}

0 comments on commit 1c0f00d

Please sign in to comment.