feat: add structured logging and fix silent error suppression#5
feat: add structured logging and fix silent error suppression#5
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds structured logging using Go's standard log/slog package and fixes silent error suppression throughout the Bento workflow plugin. The changes address Issue #3 by integrating logging for stream lifecycle events (start, stop, error) and message throughput tracking.
Changes:
- Replaced silent error discarding (
_ = err) with proper slog-based error logging across all module types - Added structured logging at Info level for lifecycle events (start/stop) and Debug level for message throughput
- Introduced health status tracking with thread-safe state management for the stream module
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| go.sum | Added dependency checksums for workflow SDK and related packages |
| internal/trigger.go | Added slog import and logging for trigger lifecycle, subscription events, and workflow callback errors |
| internal/stream_module.go | Added status tracking (starting/running/stopped/errored) with RWMutex protection and lifecycle logging |
| internal/processor_step.go | Added logging for step execution, passthrough, and error conditions |
| internal/output_module.go | Added logging for output module lifecycle and message forwarding with error handling |
| internal/input_module.go | Added logging for input module lifecycle and message publishing with error handling |
| internal/broker_module.go | Added logging for broker stream lifecycle, creation, and message forwarding |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Add slog-based structured logging for stream lifecycle events - Fix silent error drops in stream.go and trigger.go - Add health status tracking per stream module - Log at appropriate levels: Info lifecycle, Debug per-message, Error failures Closes #3 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
b790fdb to
8007ca4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 7 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
internal/trigger.go:170
- Start() can return early on subscription setup errors after one or more streams have already been built, appended to t.streams, and had stream.Run goroutines started. In that error path, Start returns without stopping the already-started streams and without closing t.done, leaving the trigger in a partially-started/leaking state. Consider tracking started streams in the loop and on error: stop them, cancel runCtx, wait for wg, and close t.done (or otherwise make Stop safe).
for i, sub := range t.subscriptions {
sub := sub // capture loop variable
idx := i
inputYAML, err := configToYAML(sub.inputConfig)
if err != nil {
cancel()
return fmt.Errorf("bento trigger: marshal input config for workflow %q: %w", sub.workflow, err)
}
builder := service.NewStreamBuilder()
builder.DisableLinting()
if err := builder.AddInputYAML(inputYAML); err != nil {
cancel()
return fmt.Errorf("bento trigger: add input yaml for workflow %q: %w", sub.workflow, err)
}
cb := t.callback
action := sub.action
workflow := sub.workflow
if err := builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error {
data, convErr := messageToMap(msg)
if convErr != nil {
slog.Error("failed to convert message", "error", convErr, "workflow", workflow)
return convErr
}
if workflow != "" {
data["workflow"] = workflow
}
slog.Debug("trigger firing workflow", "workflow", workflow, "action", action)
callbackErr := cb(action, data)
if callbackErr != nil {
slog.Error("workflow callback failed", "error", callbackErr, "workflow", workflow, "action", action)
}
return callbackErr
}); err != nil {
cancel()
return fmt.Errorf("bento trigger: add consumer for workflow %q: %w", sub.workflow, err)
}
stream, err := builder.Build()
if err != nil {
cancel()
return fmt.Errorf("bento trigger: build stream for workflow %q: %w", sub.workflow, err)
}
t.mu.Lock()
t.streams = append(t.streams, stream)
t.mu.Unlock()
slog.Info("bento trigger subscription started", "index", idx, "workflow", workflow, "action", action)
wg.Add(1)
go func() {
defer wg.Done()
if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil {
slog.Error("bento trigger stream runtime error", "workflow", workflow, "action", action, "error", err)
if cbErr := cb("stream_error", map[string]any{"error": err.Error(), "workflow": workflow, "action": action}); cbErr != nil {
slog.Error("bento trigger stream_error callback error", "workflow", workflow, "action", action, "callback_error", cbErr)
}
}
}()
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -35,29 +62,41 @@ func (m *streamModule) Init() error { | |||
|
|
|||
| // Start builds and runs the Bento stream. | |||
| func (m *streamModule) Start(ctx context.Context) error { | |||
| m.setStatus(streamStarting) | |||
| slog.Info("starting bento stream", "module", m.name) | |||
|
|
|||
| yamlStr, err := configToYAML(m.config) | |||
| if err != nil { | |||
| m.setStatus(streamErrored) | |||
| return fmt.Errorf("bento.stream %q: %w", m.name, err) | |||
| } | |||
|
|
|||
| builder := service.NewStreamBuilder() | |||
| if err := builder.SetYAML(yamlStr); err != nil { | |||
| m.setStatus(streamErrored) | |||
| return fmt.Errorf("bento.stream %q: set yaml: %w", m.name, err) | |||
| } | |||
|
|
|||
| stream, err := builder.Build() | |||
| if err != nil { | |||
| m.setStatus(streamErrored) | |||
| return fmt.Errorf("bento.stream %q: build stream: %w", m.name, err) | |||
| } | |||
| m.stream = stream | |||
|
|
|||
| runCtx, cancel := context.WithCancel(context.Background()) | |||
| m.cancel = cancel | |||
|
|
|||
| m.setStatus(streamRunning) | |||
| slog.Info("bento stream running", "module", m.name) | |||
|
|
|||
| go func() { | |||
| defer close(m.done) | |||
| if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { | |||
| slog.Error("bento stream runtime error", "name", m.name, "error", err) | |||
| m.setStatus(streamErrored) | |||
| slog.Error("bento stream failed", "error", err, "module", m.name) | |||
| } else { | |||
| m.setStatus(streamStopped) | |||
| } | |||
| }() | |||
There was a problem hiding this comment.
New lifecycle status tracking (Status()/setStatus and the starting/running/errored transitions) is not covered by tests. Please extend stream_module_test.go to assert the expected status transitions (including the error paths) so regressions are caught.
There was a problem hiding this comment.
Also, are status transitions a sign that we need a state machine or no?
| runCtx, cancel := context.WithCancel(context.Background()) | ||
| m.cancel = cancel | ||
|
|
||
| moduleName := m.name | ||
|
|
||
| go func() { | ||
| defer close(m.done) | ||
| if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { | ||
| slog.Error("bento output stream runtime error", "name", m.name, "error", err) | ||
| slog.Error("bento output stream failed", "error", err, "module", moduleName) | ||
| } | ||
| }() | ||
|
|
||
| slog.Info("bento output running", "module", m.name) | ||
|
|
||
| // Subscribe to the host EventBus topic. When messages arrive, forward them | ||
| // to the Bento producer. | ||
| producerFnRef := m.producerFn | ||
| if err := m.subscriber.Subscribe(m.sourceTopic, func(payload []byte, metadata map[string]string) error { | ||
| slog.Debug("sending message to bento output", "module", moduleName, "topic", m.sourceTopic, "size", len(payload)) | ||
|
|
||
| msg := service.NewMessage(payload) | ||
| for k, v := range metadata { | ||
| msg.MetaSet(k, v) | ||
| } | ||
| return producerFnRef(runCtx, msg) | ||
| sendErr := producerFnRef(runCtx, msg) | ||
| if sendErr != nil { | ||
| slog.Error("failed to send message to bento output", "error", sendErr, "module", moduleName) | ||
| } | ||
| return sendErr | ||
| }); err != nil { | ||
| return fmt.Errorf("bento.output %q: subscribe to topic %q: %w", m.name, m.sourceTopic, err) | ||
| } |
There was a problem hiding this comment.
If subscriber.Subscribe(...) fails, Start returns an error after the stream.Run goroutine has already been launched with a background-derived context. This leaves a running stream/goroutine with no way for the caller to clean it up. Consider subscribing before starting the stream, or ensure you Stop/cancel and wait for m.done before returning the subscribe error.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
…tion tests (#12) * Initial plan * fix: address PR review feedback on output/broker modules and add status transition tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
|
@copilot open a new pull request to apply changes based on the comments in this thread |
…/goroutine ordering (#14) * Initial plan * fix: set streamErrored status in Stop() when stream.Stop() fails Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| runCtx, cancel := context.WithCancel(context.Background()) | ||
| m.cancel = cancel | ||
|
|
||
| go func() { | ||
| defer close(m.done) | ||
| if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { | ||
| slog.Error("bento output stream runtime error", "name", m.name, "error", err) | ||
| } | ||
| }() | ||
|
|
||
| // Subscribe to the host EventBus topic. When messages arrive, forward them | ||
| // to the Bento producer. | ||
| moduleName := m.name | ||
| producerFnRef := m.producerFn | ||
|
|
||
| // Subscribe before launching the goroutine so that if Subscribe fails we | ||
| // have not yet started the stream and there is no goroutine to clean up. | ||
| if err := m.subscriber.Subscribe(m.sourceTopic, func(payload []byte, metadata map[string]string) error { | ||
| slog.Debug("sending message to bento output", "module", moduleName, "topic", m.sourceTopic, "size", len(payload)) | ||
|
|
||
| msg := service.NewMessage(payload) | ||
| for k, v := range metadata { | ||
| msg.MetaSet(k, v) | ||
| } | ||
| return producerFnRef(runCtx, msg) | ||
| sendErr := producerFnRef(runCtx, msg) | ||
| if sendErr != nil { | ||
| slog.Error("failed to send message to bento output", "error", sendErr, "module", moduleName) | ||
| } | ||
| return sendErr | ||
| }); err != nil { | ||
| cancel() | ||
| return fmt.Errorf("bento.output %q: subscribe to topic %q: %w", m.name, m.sourceTopic, err) | ||
| } |
There was a problem hiding this comment.
In Start(), Subscribe is executed before the stream.Run goroutine is started. If Subscribe returns an error, Start exits without ever closing m.done; a subsequent Stop() will block waiting on m.done until the context times out. Also, messages could be delivered immediately after Subscribe succeeds but before stream.Run starts, potentially blocking inside producerFnRef(runCtx, msg) depending on Bento internals. Consider starting the stream goroutine before subscribing, and on Subscribe failure stop/cancel the stream and wait for the goroutine to exit (or ensure m.done is closed/initialized in the error path).
|
@copilot open a new pull request to apply changes based on the comments in this thread and #5 (review) |
…assertion (#16) * Initial plan * fix: goroutine ordering in output Start, log all trigger stop errors, assert Stop in test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Closes #3
Changes