feat: add structured logging and observability hooks#7
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds comprehensive structured logging and observability infrastructure to the Bento workflow plugin. It introduces standardized logging via log/slog, thread-safe metrics tracking using atomic operations, and health status reporting based on runtime state and error counts. The implementation follows a consistent pattern across all module types (stream, input, output, broker, processor) and includes comprehensive unit tests.
Changes:
- Added structured logging infrastructure using
log/slogwith consistent patterns for lifecycle events, message throughput, and errors - Implemented thread-safe metrics tracking (messages in/out, errors, uptime) using
sync/atomicfor counters and mutex-protected timestamps - Added health status reporting (healthy/degraded/unhealthy) with per-module Health() methods returning detailed reports
- Integrated observability hooks into all module types and processor steps
- Fixed three pre-existing lint issues (unchecked return value, unused function, missing nolint annotation)
- Added 21 comprehensive unit tests covering logging output, metrics counting, health transitions, and concurrent access
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/logger.go | New structured logger wrapper using slog with component-specific context fields and methods for stream lifecycle, message processing, and error logging |
| internal/metrics.go | New thread-safe metrics tracker using atomic counters for messages/errors and mutex for timestamps, with snapshot capability |
| internal/health.go | New health status tracker with three states (healthy/degraded/unhealthy) based on running state and error threshold |
| internal/stream_module.go | Integrated logging, metrics, and health tracking; added Health() method |
| internal/input_module.go | Integrated observability with message counting on publish; added Health() method |
| internal/output_module.go | Integrated observability with message counting on forward; added Health() method |
| internal/broker_module.go | Integrated observability across per-topic streams; added Health() method |
| internal/processor_step.go | Added logging for processing lifecycle (start/complete/error) |
| internal/bridge.go | Fixed lint issue by handling MetaWalkMut return value; removed unused mapToMessage function |
| internal/logger_test.go | Comprehensive test suite (21 tests) covering all logging methods, metrics operations, health transitions, and concurrent access with race detection |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
internal/broker_module.go:110
ensureStreamruns the per-topic stream with the caller-providedctx. For a cached, long-lived per-topic stream, using a request-scoped context would stop the stream as soon as that context is canceled. Consider deriving the stream context from the broker/module lifecycle (orcontext.Background()plus broker shutdown) rather than the call-site ctx.
// ensureStream returns (creating if necessary) a running stream for topic.
// This is used internally when the broker needs a dedicated in-process pipe.
//
//nolint:unused // Reserved for future use by broker consumers.
func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service.Stream, error) {
m.mu.RLock()
if s, ok := m.streams[topic]; ok {
m.mu.RUnlock()
return s, nil
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
8c10fc6 to
8695bee
Compare
|
@copilot open a new pull request to apply changes based on the comments in this thread |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot open a new pull request to apply changes based on the comments in this thread and and |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
- Add internal/logger.go: bentoLogger wrapping log/slog with structured fields (component, name) and helpers LogStreamStart, LogStreamStop, LogStreamError, LogMessageProcessed, LogTopicEvent, LogProcessingStart, LogProcessingComplete, LogProcessingError - Add internal/metrics.go: thread-safe StreamMetrics using sync/atomic for message-in/out and error counters, plus mutex-guarded start time and last-message-time; Snapshot() for point-in-time read - Add internal/health.go: healthTracker derives HealthStatus (healthy / degraded / unhealthy) from metrics and running state; HealthReport struct for per-stream health reporting - Update stream_module, input_module, output_module, broker_module: attach logger/metrics/health on construction; emit log lines at start, stop, error, and per-message events; expose Health() method - Update processor_step: log processing start, complete, and error events - Fix pre-existing lint issues: check MetaWalkMut return value, remove unused mapToMessage helper, nolint annotation on future-use ensureStream - Add internal/logger_test.go: 21 unit tests covering log output format, metrics counting, health state transitions, and concurrent metric updates Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* Initial plan * fix: apply PR review feedback for observability improvements - Fix wrong comment on LogStreamStop: 'uptimeSeconds' -> 'messagesProcessed' - Use slog.Default() in newLogger to respect global slog configuration - Pass snap.Errors (not always-zero snap.MessagesIn) to LogStreamStop in stream_module - Remove unnecessary mutex lock/unlock in newStreamMetrics initialization Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
… uptime semantics, subscribe ordering (#13) * Initial plan * fix: apply review feedback - lifecycle hooks, health on unexpected exit, subscribe ordering Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
…e for all modules (#15) * Initial plan * fix: stop built stream on subscribe error and add Health() tests for all modules Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
…tic health tests (#17) * Initial plan * fix: hot-path atomics, debug guard, goroutine cleanup, deterministic tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
e666723 to
dc4bda1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
#19, #20) - #4 output_module: reinitialize done channel at Start for restart support - #6 broker_module: copy streams map under lock then release before calling stream.Stop to prevent potential deadlock in Stop() - #8 processor_step: use sync.Once in consumer func so fan-out (processor emitting multiple messages) does not block on the size-1 channel - #9 processor_step: deferred cleanup selects on ctx.Done() when draining streamDone so a cancelled parent context does not cause an unconditional block - #17 input_module: extract real transport type via inputTransportType() helper so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input" - #18 output_module: extract real transport type via outputTransportType() helper so LogStreamStart logs the actual Bento output type - #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal errors are surfaced instead of silently ignored; updated all call sites - #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in stream_module_test, output_module_test, and broker_module_test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
log/slogfor all module lifecycle events (stream start/stop/error, message processed, topic events, processing start/complete/error)sync/atomicfor thread safetystream_module,input_module,output_module,broker_module, andprocessor_stepMetaWalkMutreturn, unusedmapToMessagehelper, andnolintannotation on future-useensureStreamCloses #3
Test plan
-raceflaggolangci-lint runreports 0 issuesgo buildpasses🤖 Generated with Claude Code