Skip to content

Commit

Permalink
endpoint: Add function to initialize event queue
Browse files Browse the repository at this point in the history
This is useful during endpoint validation when endpoints are being
restored. When they are being restored, their event queue is not yet
initialized because they haven't been exposed to the endpoint manager.
It is important to initialize an endpoint's event queue so that events
are not missed during their restoration.

Signed-off-by: Chris Tarazi <chris@isovalent.com>
  • Loading branch information
christarazi authored and tklauser committed Oct 19, 2020
1 parent fad24e6 commit 79bf425
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion pkg/endpoint/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func (e *Endpoint) Expose(mgr endpointManager) error {
// its ID, and its eventqueue can be safely started. Ensure that it is only
// started once it is exposed to the endpointmanager so that it will be
// stopped when the endpoint is removed from the endpointmanager.
e.eventQueue = eventqueue.NewEventQueueBuffered(fmt.Sprintf("endpoint-%d", e.ID), option.Config.EndpointQueueSize)
if e.eventQueue == nil {
e.InitEventQueue()
}
e.eventQueue.Run()

mgr.AddIPv6Address(e.IPv6)
Expand Down Expand Up @@ -167,3 +169,17 @@ func (e *Endpoint) Unexpose(mgr endpointManager) <-chan struct{} {
e.removeReferences(mgr)
return epRemoved
}

// InitEventQueue initializes the endpoint's event queue. Note that this
// function does not begin processing events off the queue, as that's left up
// to the caller to call Expose in order to allow other subsystems to access
// the endpoint. This function assumes that the endpoint ID has already been
// allocated!
//
// Having this be a separate function allows us to prepare
// the event queue while the endpoint is being validated (during restoration)
// so that when its metadata is resolved, events can be enqueued (such as
// visibility policy and bandwidth policy).
func (e *Endpoint) InitEventQueue() {
e.eventQueue = eventqueue.NewEventQueueBuffered(fmt.Sprintf("endpoint-%d", e.ID), option.Config.EndpointQueueSize)
}

0 comments on commit 79bf425

Please sign in to comment.