Skip to content

Commit

Permalink
events: Ensure pipelines are cleaned up on closing subscription (#23042)
Browse files Browse the repository at this point in the history
* events: Ensure pipelines are cleaned up on closing subscription
* Re-register formatter node on each subscribe
  • Loading branch information
tomhjp committed Sep 13, 2023
1 parent 27d647f commit 8e7c6e8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
4 changes: 4 additions & 0 deletions changelog/23042.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
events: Ensure subscription resources are cleaned up on close.
```

48 changes: 24 additions & 24 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ type asyncChanNode struct {
logger hclog.Logger

// used to close the connection
closeOnce sync.Once
cancelFunc context.CancelFunc
pipelineID eventlogger.PipelineID
broker *eventlogger.Broker
closeOnce sync.Once
cancelFunc context.CancelFunc
pipelineID eventlogger.PipelineID
removePipeline func(ctx context.Context, t eventlogger.EventType, id eventlogger.PipelineID) (bool, error)
}

var (
Expand Down Expand Up @@ -185,10 +185,6 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
return nil, err
}
formatterNodeID := eventlogger.NodeID(formatterID)
err = broker.RegisterNode(formatterNodeID, cloudEventsFormatterFilter)
if err != nil {
return nil, err
}

if logger == nil {
logger = hclog.Default().Named("events")
Expand Down Expand Up @@ -217,6 +213,11 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return nil, nil, err
}

err = bus.broker.RegisterNode(bus.formatterNodeID, cloudEventsFormatterFilter)
if err != nil {
return nil, nil, err
}

filterNodeID, err := uuid.GenerateUUID()
if err != nil {
return nil, nil, err
Expand All @@ -237,7 +238,7 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
}

ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, bus.logger)
asyncNode := newAsyncNode(ctx, bus.logger, bus.broker)
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
Expand Down Expand Up @@ -312,29 +313,28 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}, nil
}

func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.Broker) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *eventlogger.Event),
logger: logger,
ctx: ctx,
ch: make(chan *eventlogger.Event),
logger: logger,
removePipeline: broker.RemovePipelineAndNodes,
}
}

// Close tells the bus to stop sending us events.
func (node *asyncChanNode) Close(ctx context.Context) {
node.closeOnce.Do(func() {
defer node.cancelFunc()
if node.broker != nil {
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)

switch {
case err != nil && isPipelineRemoved:
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
case err != nil:
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
}
removed, err := node.removePipeline(ctx, eventTypeAll, node.pipelineID)

switch {
case err != nil && removed:
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
case err != nil:
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
}
addSubscriptions(-1)
})
Expand Down
27 changes: 27 additions & 0 deletions vault/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,30 @@ func TestBexpr(t *testing.T) {
})
}
}

// TestPipelineCleanedUp ensures pipelines are properly cleaned up after
// subscriptions are closed.
func TestPipelineCleanedUp(t *testing.T) {
bus, err := NewEventBus(nil)
if err != nil {
t.Fatal(err)
}

eventType := logical.EventType("someType")
bus.Start()

_, cancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType), "")
if err != nil {
t.Fatal(err)
}
if !bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
cancel()
t.Fatal()
}

cancel()

if bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
t.Fatal()
}
}

0 comments on commit 8e7c6e8

Please sign in to comment.