Skip to content

Fix silent error suppression in stream runners and triggers#4

Merged
intel352 merged 5 commits intomainfrom
copilot/fix-silent-error-suppression
Feb 23, 2026
Merged

Fix silent error suppression in stream runners and triggers#4
intel352 merged 5 commits intomainfrom
copilot/fix-silent-error-suppression

Conversation

Copy link
Contributor

Copilot AI commented Feb 23, 2026

Stream runtime errors in Bento goroutines were silently discarded (_ = err), making crashes invisible in production and nearly impossible to diagnose.

Changes

  • stream_module.go, input_module.go, output_module.go, broker_module.go — replace _ = err with slog.Error(...), including structured fields (name, topic, error) for easy log filtering
  • trigger.go — same logging fix, plus propagate errors to the trigger callback as a "stream_error" action so the host can react, and log any error returned by the callback:
if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil {
    slog.Error("bento trigger stream runtime error", "workflow", workflow, "action", action, "error", err)
    if cbErr := cb("stream_error", map[string]any{"error": err.Error(), "workflow": workflow, "action": action}); cbErr != nil {
        slog.Error("bento trigger stream_error callback error", "workflow", workflow, "action", action, "callback_error", cbErr)
    }
}

All logging uses log/slog (stdlib, Go 1.21+). Errors are only surfaced when the context was not cancelled — intentional shutdowns remain silent.

Original prompt

This section details on the original issue you should resolve

<issue_title>Fix silent error suppression in stream runners and triggers</issue_title>
<issue_description>## Problem

Stream runtime errors are captured but silently discarded in multiple places:

  • stream.go:58-60: if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { _ = err }
  • trigger.go:149-151: Same pattern — Bento input crashes don't propagate

Stream crashes are invisible in production, making failures extremely difficult to diagnose.

Fix

  • Log errors at ERROR level using the host engine's logger
  • Emit error metrics/events for observability
  • For triggers: propagate errors to the trigger callback or error channel
  • Consider exposing stream health status via a health-check interface</issue_description>

Comments on the Issue (you are @copilot in this section)


💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

Copilot AI and others added 2 commits February 23, 2026 07:45
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Copilot AI changed the title [WIP] Fix silent error suppression in stream runners and triggers Fix silent error suppression in stream runners and triggers Feb 23, 2026
Copilot AI requested a review from intel352 February 23, 2026 07:50
@intel352 intel352 marked this pull request as ready for review February 23, 2026 08:13
Copilot AI review requested due to automatic review settings February 23, 2026 08:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses previously silent failures in Bento stream goroutines by surfacing runtime errors via structured logging, and (for triggers) notifying the host via a callback action so crashes become observable and actionable.

Changes:

  • Replace discarded stream.Run(...) errors with log/slog ERROR logs in stream/input/broker modules.
  • In the Bento trigger, log runtime errors and emit a "stream_error" action to the trigger callback.
  • Update go.sum to include newly resolved dependency sums.

Reviewed changes

Copilot reviewed 4 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
internal/trigger.go Logs trigger stream runtime errors and emits a "stream_error" callback action.
internal/stream_module.go Logs stream module runtime errors instead of discarding them.
internal/input_module.go Logs input module stream runtime errors instead of discarding them.
internal/broker_module.go Logs broker per-topic stream runtime errors (includes topic/name fields).
go.sum Adds missing sums for updated/resolved dependencies.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@intel352
Copy link
Contributor

@copilot apply changes based on the comments in this thread

…ror suppression

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
The GoCodeAlone/yaegi v0.17.0 tag was moved, causing go.sum checksum
mismatches in CI. v0.17.1 was released to fix this.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@intel352 intel352 merged commit 7b85105 into main Feb 23, 2026
8 checks passed
intel352 added a commit that referenced this pull request Feb 24, 2026
#19, #20)

- #4  output_module: reinitialize done channel at Start for restart support
- #6  broker_module: copy streams map under lock then release before calling
      stream.Stop to prevent potential deadlock in Stop()
- #8  processor_step: use sync.Once in consumer func so fan-out (processor
      emitting multiple messages) does not block on the size-1 channel
- #9  processor_step: deferred cleanup selects on ctx.Done() when draining
      streamDone so a cancelled parent context does not cause an unconditional
      block
- #17 input_module: extract real transport type via inputTransportType() helper
      so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input"
- #18 output_module: extract real transport type via outputTransportType()
      helper so LogStreamStart logs the actual Bento output type
- #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal
      errors are surfaced instead of silently ignored; updated all call sites
- #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in
      stream_module_test, output_module_test, and broker_module_test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
intel352 added a commit that referenced this pull request Feb 24, 2026
* feat: add structured logging and observability hooks (#3)

- Add internal/logger.go: bentoLogger wrapping log/slog with structured
  fields (component, name) and helpers LogStreamStart, LogStreamStop,
  LogStreamError, LogMessageProcessed, LogTopicEvent, LogProcessingStart,
  LogProcessingComplete, LogProcessingError
- Add internal/metrics.go: thread-safe StreamMetrics using sync/atomic
  for message-in/out and error counters, plus mutex-guarded start time and
  last-message-time; Snapshot() for point-in-time read
- Add internal/health.go: healthTracker derives HealthStatus (healthy /
  degraded / unhealthy) from metrics and running state; HealthReport struct
  for per-stream health reporting
- Update stream_module, input_module, output_module, broker_module:
  attach logger/metrics/health on construction; emit log lines at start,
  stop, error, and per-message events; expose Health() method
- Update processor_step: log processing start, complete, and error events
- Fix pre-existing lint issues: check MetaWalkMut return value, remove
  unused mapToMessage helper, nolint annotation on future-use ensureStream
- Add internal/logger_test.go: 21 unit tests covering log output format,
  metrics counting, health state transitions, and concurrent metric updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: format bridge.go

* chore: retrigger CI

* fix: apply observability PR review feedback (#9)

* Initial plan

* fix: apply PR review feedback for observability improvements

- Fix wrong comment on LogStreamStop: 'uptimeSeconds' -> 'messagesProcessed'
- Use slog.Default() in newLogger to respect global slog configuration
- Pass snap.Errors (not always-zero snap.MessagesIn) to LogStreamStop in stream_module
- Remove unnecessary mutex lock/unlock in newStreamMetrics initialization

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: observability lifecycle correctness — health on unexpected exit, uptime semantics, subscribe ordering (#13)

* Initial plan

* fix: apply review feedback - lifecycle hooks, health on unexpected exit, subscribe ordering

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: output stream cleanup on subscribe error + Health() test coverage for all modules (#15)

* Initial plan

* fix: stop built stream on subscribe error and add Health() tests for all modules

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: hot-path atomics, debug log guard, goroutine cleanup, deterministic health tests (#17)

* Initial plan

* fix: hot-path atomics, debug guard, goroutine cleanup, deterministic tests

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: address remaining PR #7 review comments (#4, #6, #8, #9, #17, #18, #19, #20)

- #4  output_module: reinitialize done channel at Start for restart support
- #6  broker_module: copy streams map under lock then release before calling
      stream.Stop to prevent potential deadlock in Stop()
- #8  processor_step: use sync.Once in consumer func so fan-out (processor
      emitting multiple messages) does not block on the size-1 channel
- #9  processor_step: deferred cleanup selects on ctx.Done() when draining
      streamDone so a cancelled parent context does not cause an unconditional
      block
- #17 input_module: extract real transport type via inputTransportType() helper
      so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input"
- #18 output_module: extract real transport type via outputTransportType()
      helper so LogStreamStart logs the actual Bento output type
- #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal
      errors are surfaced instead of silently ignored; updated all call sites
- #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in
      stream_module_test, output_module_test, and broker_module_test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix silent error suppression in stream runners and triggers

3 participants