From faf017bd4405439bbd2491450b65cacb6a2ca647 Mon Sep 17 00:00:00 2001 From: Oleg Obleukhov Date: Fri, 22 May 2026 07:16:29 -0700 Subject: [PATCH] add stats (#529) Summary: Add monitoring stats to ntripper Differential Revision: D106081876 --- cmd/ntripper/main.go | 40 +++++++++------ ntripper/stats/json.go | 92 ++++++++++++++++++++++++++++++++++ ntripper/stats/json_test.go | 98 +++++++++++++++++++++++++++++++++++++ ntripper/stats/stats.go | 31 ++++++++++++ 4 files changed, 247 insertions(+), 14 deletions(-) create mode 100644 ntripper/stats/json.go create mode 100644 ntripper/stats/json_test.go create mode 100644 ntripper/stats/stats.go diff --git a/cmd/ntripper/main.go b/cmd/ntripper/main.go index 2ad0cc9e..0d2af934 100644 --- a/cmd/ntripper/main.go +++ b/cmd/ntripper/main.go @@ -38,6 +38,7 @@ import ( "time" "github.com/facebook/time/ntrip" + "github.com/facebook/time/ntripper/stats" "github.com/facebook/time/rtcm" ) @@ -52,6 +53,7 @@ type config struct { proxyCert string proxyKey string reconnectInterval time.Duration + monitoringPort int logLevel string dryRun bool } @@ -79,6 +81,8 @@ func main() { "PEM private key for proxy TLS authentication (defaults to proxy-cert)") flag.DurationVar(&cfg.reconnectInterval, "reconnect-interval", 5*time.Second, "delay between reconnection attempts") + flag.IntVar(&cfg.monitoringPort, "monitoring-port", 8891, + "port for JSON monitoring HTTP server (0 to disable)") flag.StringVar(&cfg.logLevel, "log-level", "info", "log level (debug, info, warn, error)") flag.BoolVar(&cfg.dryRun, "dry-run", false, @@ -99,7 +103,12 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer cancel() - run(ctx, cfg, logger) + st := stats.NewJSONStats() + if cfg.monitoringPort > 0 { + go st.Start(cfg.monitoringPort) + } + + run(ctx, cfg, logger, st) } func setupLogger(level string) *slog.Logger { @@ -117,9 +126,9 @@ func setupLogger(level string) *slog.Logger { return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: lvl})) } -func run(ctx context.Context, cfg config, logger *slog.Logger) { +func run(ctx context.Context, cfg config, logger *slog.Logger, st *stats.JSONStats) { for ctx.Err() == nil { - err := runOnce(ctx, cfg, logger) + err := runOnce(ctx, cfg, logger, st) if err == nil || ctx.Err() != nil { return } @@ -129,6 +138,9 @@ func run(ctx context.Context, cfg config, logger *slog.Logger) { os.Exit(1) } + st.SetConnected(0) + st.IncReconnects() + logger.Warn("connection error, reconnecting", "error", err, "interval", cfg.reconnectInterval, @@ -141,27 +153,26 @@ func run(ctx context.Context, cfg config, logger *slog.Logger) { // runOnce connects to the socket and caster, then streams data until an error // occurs or the context is cancelled. -func runOnce(ctx context.Context, cfg config, logger *slog.Logger) error { - // Connect to oscillatord socket. +func runOnce(ctx context.Context, cfg config, logger *slog.Logger, st *stats.JSONStats) error { sockConn, err := connectSocket(ctx, cfg, logger) if err != nil { return fmt.Errorf("socket: %w", err) } defer sockConn.Close() + st.SetConnected(1) + if cfg.dryRun { - return printFrames(ctx, sockConn, logger) + return printFrames(ctx, sockConn, logger, st) } - // Connect to NTRIP caster. client, err := connectCaster(ctx, cfg, logger) if err != nil { return fmt.Errorf("caster: %w", err) } defer client.Close() - // Stream RTCM frames from socket to caster. - return streamFrames(ctx, sockConn, client, logger) + return streamFrames(ctx, sockConn, client, logger, st) } func connectSocket(ctx context.Context, cfg config, logger *slog.Logger) (net.Conn, error) { @@ -211,6 +222,7 @@ func streamFrames( sockConn net.Conn, client *ntrip.Client, logger *slog.Logger, + st *stats.JSONStats, ) error { scanner := rtcm.NewScanner(sockConn) var frameCount uint64 @@ -221,6 +233,8 @@ func streamFrames( } frame := scanner.Frame() + st.IncFramesReceived() + if _, err := client.Write(frame.Raw); err != nil { return fmt.Errorf("writing to caster: %w", err) } @@ -235,12 +249,10 @@ func streamFrames( return fmt.Errorf("reading from socket: %w", err) } - // Scanner returned false with nil error — clean EOF from socket. return fmt.Errorf("socket closed (EOF)") } -// printFrames reads RTCM frames from the socket and prints their details to stdout. -func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) error { +func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger, st *stats.JSONStats) error { logger.Info("dry-run mode: printing frames to stdout") scanner := rtcm.NewScanner(sockConn) var frameCount uint64 @@ -252,6 +264,8 @@ func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) er frame := scanner.Frame() frameCount++ + st.IncFramesReceived() + fmt.Printf("frame=%d type=%d len=%d\n", frameCount, frame.MessageType, len(frame.Raw)) } @@ -261,8 +275,6 @@ func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) er return fmt.Errorf("socket closed (EOF)") } -// sleep waits for the specified duration or until the context is cancelled. -// Returns true if the sleep completed, false if interrupted. func sleep(ctx context.Context, d time.Duration) bool { timer := time.NewTimer(d) defer timer.Stop() diff --git a/ntripper/stats/json.go b/ntripper/stats/json.go new file mode 100644 index 00000000..1dc3112c --- /dev/null +++ b/ntripper/stats/json.go @@ -0,0 +1,92 @@ +/* +Copyright (c) Facebook, Inc. and its affiliates. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync/atomic" + "time" +) + +// JSONStats serves ntripper metrics as JSON over HTTP. +type JSONStats struct { + counters + + // framesPerSecond is computed by a background ticker. + framesPerSecond atomic.Int64 + lastFrames atomic.Int64 +} + +// NewJSONStats creates a new JSONStats instance. +func NewJSONStats() *JSONStats { + return &JSONStats{} +} + +// Start launches the HTTP server and a background goroutine that computes +// per-second rates. +func (s *JSONStats) Start(port int) { + go s.computeRates() + + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleRequest) + addr := fmt.Sprintf(":%d", port) + slog.Info("starting JSON monitoring server", "addr", addr) + srv := &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + } + if err := srv.ListenAndServe(); err != nil { + slog.Error("monitoring server failed", "error", err) + } +} + +func (s *JSONStats) computeRates() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + cur := s.framesReceived.Load() + prev := s.lastFrames.Swap(cur) + s.framesPerSecond.Store(cur - prev) + } +} + +func (s *JSONStats) handleRequest(w http.ResponseWriter, _ *http.Request) { + snap := snapshot{ + FramesPerSecond: s.framesPerSecond.Load(), + Connected: s.connected.Load(), + Reconnects: s.reconnects.Load(), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(snap); err != nil { + slog.Error("failed to write monitoring response", "error", err) + } +} + +// IncFramesReceived increments the received frame counter. +func (s *JSONStats) IncFramesReceived() { s.framesReceived.Add(1) } + +// SetConnected sets connection status (1=connected, 0=disconnected). +func (s *JSONStats) SetConnected(v int64) { s.connected.Store(v) } + +// IncReconnects increments the reconnect counter. +func (s *JSONStats) IncReconnects() { s.reconnects.Add(1) } diff --git a/ntripper/stats/json_test.go b/ntripper/stats/json_test.go new file mode 100644 index 00000000..d03cc2fd --- /dev/null +++ b/ntripper/stats/json_test.go @@ -0,0 +1,98 @@ +/* +Copyright (c) Facebook, Inc. and its affiliates. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestJSONStatsCounters(t *testing.T) { + st := NewJSONStats() + + st.IncFramesReceived() + st.IncFramesReceived() + st.SetConnected(1) + + require.Equal(t, int64(2), st.framesReceived.Load()) + require.Equal(t, int64(1), st.connected.Load()) +} + +func TestJSONStatsHTTPEndpoint(t *testing.T) { + st := NewJSONStats() + + st.SetConnected(1) + + // Manually set the rate. + st.framesPerSecond.Store(42) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + st.handleRequest(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Equal(t, "application/json", rec.Header().Get("Content-Type")) + + var snap snapshot + err := json.Unmarshal(rec.Body.Bytes(), &snap) + require.NoError(t, err) + + require.Equal(t, int64(42), snap.FramesPerSecond) + require.Equal(t, int64(1), snap.Connected) +} + +func TestJSONStatsFramesPerSecond(t *testing.T) { + st := NewJSONStats() + + for range 10 { + st.IncFramesReceived() + } + + cur := st.framesReceived.Load() + prev := st.lastFrames.Swap(cur) + st.framesPerSecond.Store(cur - prev) + + require.Equal(t, int64(10), st.framesPerSecond.Load()) + + for range 5 { + st.IncFramesReceived() + } + + cur = st.framesReceived.Load() + prev = st.lastFrames.Swap(cur) + st.framesPerSecond.Store(cur - prev) + + require.Equal(t, int64(5), st.framesPerSecond.Load()) +} + +func TestComputeRatesIntegration(t *testing.T) { + st := NewJSONStats() + go st.computeRates() + + for range 20 { + st.IncFramesReceived() + } + + time.Sleep(1100 * time.Millisecond) + + require.Greater(t, st.framesPerSecond.Load(), int64(0)) +} diff --git a/ntripper/stats/stats.go b/ntripper/stats/stats.go new file mode 100644 index 00000000..5247d9be --- /dev/null +++ b/ntripper/stats/stats.go @@ -0,0 +1,31 @@ +/* +Copyright (c) Facebook, Inc. and its affiliates. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import "sync/atomic" + +type counters struct { + framesReceived atomic.Int64 + connected atomic.Int64 + reconnects atomic.Int64 +} + +type snapshot struct { + FramesPerSecond int64 `json:"frames_per_second"` + Connected int64 `json:"connected"` + Reconnects int64 `json:"reconnects"` +}