Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hubble: Fix Race in Hubble Consumer #15967

Merged
merged 1 commit into from
May 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 24 additions & 3 deletions pkg/hubble/monitor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/lock"
monitorConsumer "github.com/cilium/cilium/pkg/monitor/agent/consumer"
nodeTypes "github.com/cilium/cilium/pkg/node/types"

Expand All @@ -34,6 +35,7 @@ type Observer interface {
type consumer struct {
observer Observer
numEventsLost uint64
lostLock lock.Mutex
}

// NewConsumer returns an initialized pointer to consumer.
Expand All @@ -49,6 +51,14 @@ func NewConsumer(observer Observer) monitorConsumer.MonitorConsumer {
// counter to the observer. If it succeeds to enqueue a notification, it
// resets the counter.
func (c *consumer) sendNumLostEvents() {
c.lostLock.Lock()
defer c.lostLock.Unlock()
// check again, in case multiple
// routines contended the lock
if c.numEventsLost == 0 {
return
}

numEventsLostNotification := &observerTypes.MonitorEvent{
Timestamp: time.Now(),
NodeName: nodeTypes.GetName(),
Expand All @@ -62,6 +72,7 @@ func (c *consumer) sendNumLostEvents() {
// We now now safely reset the counter, as at this point have
// successfully notified the observer about the amount of events
// that were lost since the previous LostEvent message
c.observer.GetLogger().Warningf("hubble events queue is processing messages again: %d messages were lost", c.numEventsLost)
c.numEventsLost = 0
default:
// We do not need to bump the numEventsLost counter here, as we will
Expand All @@ -77,13 +88,23 @@ func (c *consumer) sendEvent(event *observerTypes.MonitorEvent) {
if c.numEventsLost > 0 {
c.sendNumLostEvents()
}

select {
case c.observer.GetEventsChannel() <- event:
default:
c.observer.GetLogger().Debug("hubble events queue is full, dropping message")
c.numEventsLost++
c.logStartedDropping()
nathanjsweet marked this conversation as resolved.
Show resolved Hide resolved
}
}

// logStartedDropping logs that the events channel is full
// and starts couting exactly how many messages it has
// lost until the consumer can recover.
func (c *consumer) logStartedDropping() {
c.lostLock.Lock()
defer c.lostLock.Unlock()
if c.numEventsLost == 0 {
c.observer.GetLogger().Warning("hubble events queue is full; dropping messages")
}
c.numEventsLost++
}

// NotifyAgentEvent implements monitorConsumer.MonitorConsumer
Expand Down