Skip to content

Commit

Permalink
hubble: Fix Race in Hubble Consumer
Browse files Browse the repository at this point in the history
When the hubble event consumer channel is full there is
a possible race condition. The code was structured such that
the consumer could end up losing the count of lost events, because
it was updating the count without a lock. Additionally, it could
reach a condition wherein it would reset the count and send a
notification of recovery more than once.

Finally, there was a log message for dropped events, but it was
"debug" level event rather than the warning that it should be.

Signed-off-by: Nate Sweet <nathanjsweet@pm.me>
  • Loading branch information
nathanjsweet committed May 3, 2021
1 parent efb3a72 commit c11c010
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions pkg/hubble/monitor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/lock"
monitorConsumer "github.com/cilium/cilium/pkg/monitor/agent/consumer"
nodeTypes "github.com/cilium/cilium/pkg/node/types"

Expand All @@ -34,6 +35,7 @@ type Observer interface {
type consumer struct {
observer Observer
numEventsLost uint64
lostLock lock.Mutex
}

// NewConsumer returns an initialized pointer to consumer.
Expand All @@ -49,6 +51,14 @@ func NewConsumer(observer Observer) monitorConsumer.MonitorConsumer {
// counter to the observer. If it succeeds to enqueue a notification, it
// resets the counter.
func (c *consumer) sendNumLostEvents() {
c.lostLock.Lock()
defer c.lostLock.Unlock()
// check again, in case multiple
// routines contended the lock
if c.numEventsLost == 0 {
return
}

numEventsLostNotification := &observerTypes.MonitorEvent{
Timestamp: time.Now(),
NodeName: nodeTypes.GetName(),
Expand All @@ -62,6 +72,7 @@ func (c *consumer) sendNumLostEvents() {
// We now now safely reset the counter, as at this point have
// successfully notified the observer about the amount of events
// that were lost since the previous LostEvent message
c.observer.GetLogger().Warningf("hubble events queue is processing messages again: %d messages were lost", c.numEventsLost)
c.numEventsLost = 0
default:
// We do not need to bump the numEventsLost counter here, as we will
Expand All @@ -77,13 +88,23 @@ func (c *consumer) sendEvent(event *observerTypes.MonitorEvent) {
if c.numEventsLost > 0 {
c.sendNumLostEvents()
}

select {
case c.observer.GetEventsChannel() <- event:
default:
c.observer.GetLogger().Debug("hubble events queue is full, dropping message")
c.numEventsLost++
c.logStartedDropping()
}
}

// logStartedDropping logs that the events channel is full
// and starts couting exactly how many messages it has
// lost until the consumer can recover.
func (c *consumer) logStartedDropping() {
c.lostLock.Lock()
defer c.lostLock.Unlock()
if c.numEventsLost == 0 {
c.observer.GetLogger().Warning("hubble events queue is full; dropping messages")
}
c.numEventsLost++
}

// NotifyAgentEvent implements monitorConsumer.MonitorConsumer
Expand Down

0 comments on commit c11c010

Please sign in to comment.