Skip to content

Commit

Permalink
Changes from PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
rossjones committed Apr 10, 2024
1 parent 9cacc54 commit ff7828f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 30 deletions.
36 changes: 20 additions & 16 deletions pkg/node/manager/events.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:generate mockgen --source events.go --destination mocks.go --package manager
package manager

import (
Expand Down Expand Up @@ -48,20 +49,27 @@ func WithClock(clock clock.Clock) NodeEventEmitterOption {
}
}

// NodeEventHandler defines the interface for components which wish to respond to node events
type NodeEventHandler interface {
HandleNodeEvent(ctx context.Context, info models.NodeInfo, event NodeEvent)
}

// NodeEventEmitter is a struct that will be used to emit events and register callbacks for those events.
// Events will be emitted by the node manager when a node is approved or rejected, and the expectation
// is that they will be consumed by the evaluation broker to create new evaluations.
// It is safe for concurrent use.
type NodeEventEmitter struct {
mu sync.Mutex
callbacks []NodeEventHandler
clock clock.Clock
mu sync.Mutex
callbacks []NodeEventHandler
clock clock.Clock
emitTimeout time.Duration
}

func NewNodeEventEmitter(options ...NodeEventEmitterOption) *NodeEventEmitter {
emitter := &NodeEventEmitter{
callbacks: make([]NodeEventHandler, 0),
clock: clock.New(),
callbacks: make([]NodeEventHandler, 0),
clock: clock.New(),
emitTimeout: 1 * time.Second,
}

for _, option := range options {
Expand Down Expand Up @@ -104,16 +112,12 @@ func (e *NodeEventEmitter) EmitEvent(ctx context.Context, info models.NodeInfo,
wg.Wait()
}()

waitDuration := 1 * time.Second

for {
select {
case <-completedChan:
return nil
case <-ctx.Done():
return nil
case <-e.clock.After(waitDuration):
return fmt.Errorf("timed out waiting for %s event callbacks to complete", event.String())
}
select {
case <-completedChan:
return nil
case <-ctx.Done():
return ctx.Err()
case <-e.clock.After(e.emitTimeout):
return fmt.Errorf("timed out waiting for %s event callbacks to complete", event.String())
}
}
12 changes: 0 additions & 12 deletions pkg/node/manager/interfaces.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/node/manager/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/orchestrator/evaluation/inmemory_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,3 +823,5 @@ func (b *InMemoryBroker) HandleNodeEvent(ctx context.Context, info models.NodeIn

log.Ctx(ctx).Info().Msgf("Received node event %s for node %s", evt.String(), info.NodeID)
}

var _ manager.NodeEventHandler = &InMemoryBroker{}

0 comments on commit ff7828f

Please sign in to comment.