Skip to content

Commit

Permalink
eventqueue: Sort functions by importance
Browse files Browse the repository at this point in the history
This commit contains no functional changes. It just reorders some of the
functions by importance in this file so that it's easier to parse. This
should hopefully reduce the number of times needed to scroll up and
down.

Signed-off-by: Chris Tarazi <chris@isovalent.com>
  • Loading branch information
christarazi authored and joestringer committed Oct 23, 2020
1 parent 320b83f commit 253368a
Showing 1 changed file with 55 additions and 55 deletions.
110 changes: 55 additions & 55 deletions pkg/eventqueue/eventqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ func NewEventQueue() *EventQueue {

}

func (q *EventQueue) getLogger() *logrus.Entry {
return log.WithFields(
logrus.Fields{
"name": q.name,
})
}

// NewEventQueueBuffered returns an EventQueue with a capacity of,
// numBufferedEvents at a time, and all other needed fields initialized.
func NewEventQueueBuffered(name string, numBufferedEvents int) *EventQueue {
Expand All @@ -107,6 +100,54 @@ func NewEventQueueBuffered(name string, numBufferedEvents int) *EventQueue {

}

// Enqueue pushes the given event onto the EventQueue. If the queue has been
// stopped, the Event will not be enqueued, and its cancel channel will be
// closed, indicating that the Event was not ran. This function may block if
// the queue is at its capacity for events. If a single Event has Enqueue
// called on it multiple times asynchronously, there is no guarantee as to
// which one will return the channel which passes results back to the caller.
// It is up to the caller to check whether the returned channel is nil, as
// waiting to receive on such a channel will block forever. Returns an error
// if the Event has been previously enqueued, if the Event is nil, or the queue
// itself is not initialized properly.
func (q *EventQueue) Enqueue(ev *Event) (<-chan interface{}, error) {
if q.notSafeToAccess() || ev == nil {
return nil, fmt.Errorf("unable to Enqueue event")
}

// Events can only be enqueued once.
if atomic.AddInt32(&ev.enqueued, 1) > 1 {
return nil, fmt.Errorf("unable to Enqueue event; event has already had Enqueue called on it")
}

// Multiple Enqueues can occur at the same time. Ensure that events channel
// is not closed while we are enqueueing events.
q.eventsMu.RLock()
defer q.eventsMu.RUnlock()

select {
// The event should be drained from the queue (e.g., it should not be
// processed).
case <-q.drain:
// Closed eventResults channel signifies cancellation.
close(ev.cancelled)
close(ev.eventResults)

return ev.eventResults, nil
default:
// The events channel may be closed even if an event has been pushed
// onto the events channel, as events are consumed off of the events
// channel asynchronously! If the EventQueue is closed before this
// event is processed, then it will be cancelled.

ev.stats.waitEnqueue.Start()
ev.stats.waitConsumeOffQueue.Start()
q.events <- ev
ev.stats.waitEnqueue.End(true)
return ev.eventResults, nil
}
}

// Event is an event that can be enqueued onto an EventQueue.
type Event struct {
// Metadata is the information about the event which is sent
Expand Down Expand Up @@ -174,54 +215,6 @@ func (ev *Event) WasCancelled() bool {
}
}

// Enqueue pushes the given event onto the EventQueue. If the queue has been
// stopped, the Event will not be enqueued, and its cancel channel will be
// closed, indicating that the Event was not ran. This function may block if
// the queue is at its capacity for events. If a single Event has Enqueue
// called on it multiple times asynchronously, there is no guarantee as to
// which one will return the channel which passes results back to the caller.
// It is up to the caller to check whether the returned channel is nil, as
// waiting to receive on such a channel will block forever. Returns an error
// if the Event has been previously enqueued, if the Event is nil, or the queue
// itself is not initialized properly.
func (q *EventQueue) Enqueue(ev *Event) (<-chan interface{}, error) {
if q.notSafeToAccess() || ev == nil {
return nil, fmt.Errorf("unable to Enqueue event")
}

// Events can only be enqueued once.
if atomic.AddInt32(&ev.enqueued, 1) > 1 {
return nil, fmt.Errorf("unable to Enqueue event; event has already had Enqueue called on it")
}

// Multiple Enqueues can occur at the same time. Ensure that events channel
// is not closed while we are enqueueing events.
q.eventsMu.RLock()
defer q.eventsMu.RUnlock()

select {
// The event should be drained from the queue (e.g., it should not be
// processed).
case <-q.drain:
// Closed eventResults channel signifies cancellation.
close(ev.cancelled)
close(ev.eventResults)

return ev.eventResults, nil
default:
// The events channel may be closed even if an event has been pushed
// onto the events channel, as events are consumed off of the events
// channel asynchronously! If the EventQueue is closed before this
// event is processed, then it will be cancelled.

ev.stats.waitEnqueue.Start()
ev.stats.waitConsumeOffQueue.Start()
q.events <- ev
ev.stats.waitEnqueue.End(true)
return ev.eventResults, nil
}
}

func (ev *Event) printStats(q *EventQueue) {
if option.Config.Debug {
q.getLogger().WithFields(logrus.Fields{
Expand Down Expand Up @@ -314,6 +307,13 @@ func (q *EventQueue) WaitToBeDrained() {
<-q.eventsClosed
}

func (q *EventQueue) getLogger() *logrus.Entry {
return log.WithFields(
logrus.Fields{
"name": q.name,
})
}

// EventHandler is an interface for allowing an EventQueue to handle events
// in a generic way. To be processed by the EventQueue, all event types must
// implement any function specified in this interface.
Expand Down

0 comments on commit 253368a

Please sign in to comment.