Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions cmd/ntripper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"time"

"github.com/facebook/time/ntrip"
"github.com/facebook/time/ntripper/stats"
"github.com/facebook/time/rtcm"
)

Expand All @@ -52,6 +53,7 @@ type config struct {
proxyCert string
proxyKey string
reconnectInterval time.Duration
monitoringPort int
logLevel string
dryRun bool
}
Expand Down Expand Up @@ -79,6 +81,8 @@ func main() {
"PEM private key for proxy TLS authentication (defaults to proxy-cert)")
flag.DurationVar(&cfg.reconnectInterval, "reconnect-interval", 5*time.Second,
"delay between reconnection attempts")
flag.IntVar(&cfg.monitoringPort, "monitoring-port", 8891,
"port for JSON monitoring HTTP server (0 to disable)")
flag.StringVar(&cfg.logLevel, "log-level", "info",
"log level (debug, info, warn, error)")
flag.BoolVar(&cfg.dryRun, "dry-run", false,
Expand All @@ -99,7 +103,12 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()

run(ctx, cfg, logger)
st := stats.NewJSONStats()
if cfg.monitoringPort > 0 {
go st.Start(cfg.monitoringPort)
}

run(ctx, cfg, logger, st)
}

func setupLogger(level string) *slog.Logger {
Expand All @@ -117,9 +126,9 @@ func setupLogger(level string) *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: lvl}))
}

func run(ctx context.Context, cfg config, logger *slog.Logger) {
func run(ctx context.Context, cfg config, logger *slog.Logger, st *stats.JSONStats) {
for ctx.Err() == nil {
err := runOnce(ctx, cfg, logger)
err := runOnce(ctx, cfg, logger, st)
if err == nil || ctx.Err() != nil {
return
}
Expand All @@ -129,6 +138,9 @@ func run(ctx context.Context, cfg config, logger *slog.Logger) {
os.Exit(1)
}

st.SetConnected(0)
st.IncReconnects()

logger.Warn("connection error, reconnecting",
"error", err,
"interval", cfg.reconnectInterval,
Expand All @@ -141,27 +153,26 @@ func run(ctx context.Context, cfg config, logger *slog.Logger) {

// runOnce connects to the socket and caster, then streams data until an error
// occurs or the context is cancelled.
func runOnce(ctx context.Context, cfg config, logger *slog.Logger) error {
// Connect to oscillatord socket.
func runOnce(ctx context.Context, cfg config, logger *slog.Logger, st *stats.JSONStats) error {
sockConn, err := connectSocket(ctx, cfg, logger)
if err != nil {
return fmt.Errorf("socket: %w", err)
}
defer sockConn.Close()

st.SetConnected(1)

if cfg.dryRun {
return printFrames(ctx, sockConn, logger)
return printFrames(ctx, sockConn, logger, st)
}

// Connect to NTRIP caster.
client, err := connectCaster(ctx, cfg, logger)
if err != nil {
return fmt.Errorf("caster: %w", err)
}
defer client.Close()

// Stream RTCM frames from socket to caster.
return streamFrames(ctx, sockConn, client, logger)
return streamFrames(ctx, sockConn, client, logger, st)
}

func connectSocket(ctx context.Context, cfg config, logger *slog.Logger) (net.Conn, error) {
Expand Down Expand Up @@ -211,6 +222,7 @@ func streamFrames(
sockConn net.Conn,
client *ntrip.Client,
logger *slog.Logger,
st *stats.JSONStats,
) error {
scanner := rtcm.NewScanner(sockConn)
var frameCount uint64
Expand All @@ -221,6 +233,8 @@ func streamFrames(
}

frame := scanner.Frame()
st.IncFramesReceived()

if _, err := client.Write(frame.Raw); err != nil {
return fmt.Errorf("writing to caster: %w", err)
}
Expand All @@ -235,12 +249,10 @@ func streamFrames(
return fmt.Errorf("reading from socket: %w", err)
}

// Scanner returned false with nil error — clean EOF from socket.
return fmt.Errorf("socket closed (EOF)")
}

// printFrames reads RTCM frames from the socket and prints their details to stdout.
func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) error {
func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger, st *stats.JSONStats) error {
logger.Info("dry-run mode: printing frames to stdout")
scanner := rtcm.NewScanner(sockConn)
var frameCount uint64
Expand All @@ -252,6 +264,8 @@ func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) er

frame := scanner.Frame()
frameCount++
st.IncFramesReceived()

fmt.Printf("frame=%d type=%d len=%d\n", frameCount, frame.MessageType, len(frame.Raw))
}

Expand All @@ -261,8 +275,6 @@ func printFrames(ctx context.Context, sockConn net.Conn, logger *slog.Logger) er
return fmt.Errorf("socket closed (EOF)")
}

// sleep waits for the specified duration or until the context is cancelled.
// Returns true if the sleep completed, false if interrupted.
func sleep(ctx context.Context, d time.Duration) bool {
timer := time.NewTimer(d)
defer timer.Stop()
Expand Down
92 changes: 92 additions & 0 deletions ntripper/stats/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright (c) Facebook, Inc. and its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stats

import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync/atomic"
"time"
)

// JSONStats serves ntripper metrics as JSON over HTTP.
type JSONStats struct {
counters

// framesPerSecond is computed by a background ticker.
framesPerSecond atomic.Int64
lastFrames atomic.Int64
}

// NewJSONStats creates a new JSONStats instance.
func NewJSONStats() *JSONStats {
return &JSONStats{}
}

// Start launches the HTTP server and a background goroutine that computes
// per-second rates.
func (s *JSONStats) Start(port int) {
go s.computeRates()

mux := http.NewServeMux()
mux.HandleFunc("/", s.handleRequest)
addr := fmt.Sprintf(":%d", port)
slog.Info("starting JSON monitoring server", "addr", addr)
srv := &http.Server{
Addr: addr,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
if err := srv.ListenAndServe(); err != nil {
slog.Error("monitoring server failed", "error", err)
}
}

func (s *JSONStats) computeRates() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
cur := s.framesReceived.Load()
prev := s.lastFrames.Swap(cur)
s.framesPerSecond.Store(cur - prev)
}
}

func (s *JSONStats) handleRequest(w http.ResponseWriter, _ *http.Request) {
snap := snapshot{
FramesPerSecond: s.framesPerSecond.Load(),
Connected: s.connected.Load(),
Reconnects: s.reconnects.Load(),
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(snap); err != nil {
slog.Error("failed to write monitoring response", "error", err)
}
}

// IncFramesReceived increments the received frame counter.
func (s *JSONStats) IncFramesReceived() { s.framesReceived.Add(1) }

// SetConnected sets connection status (1=connected, 0=disconnected).
func (s *JSONStats) SetConnected(v int64) { s.connected.Store(v) }

// IncReconnects increments the reconnect counter.
func (s *JSONStats) IncReconnects() { s.reconnects.Add(1) }
98 changes: 98 additions & 0 deletions ntripper/stats/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright (c) Facebook, Inc. and its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stats

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestJSONStatsCounters(t *testing.T) {
st := NewJSONStats()

st.IncFramesReceived()
st.IncFramesReceived()
st.SetConnected(1)

require.Equal(t, int64(2), st.framesReceived.Load())
require.Equal(t, int64(1), st.connected.Load())
}

func TestJSONStatsHTTPEndpoint(t *testing.T) {
st := NewJSONStats()

st.SetConnected(1)

// Manually set the rate.
st.framesPerSecond.Store(42)

rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/", nil)
st.handleRequest(rec, req)

require.Equal(t, http.StatusOK, rec.Code)
require.Equal(t, "application/json", rec.Header().Get("Content-Type"))

var snap snapshot
err := json.Unmarshal(rec.Body.Bytes(), &snap)
require.NoError(t, err)

require.Equal(t, int64(42), snap.FramesPerSecond)
require.Equal(t, int64(1), snap.Connected)
}

func TestJSONStatsFramesPerSecond(t *testing.T) {
st := NewJSONStats()

for range 10 {
st.IncFramesReceived()
}

cur := st.framesReceived.Load()
prev := st.lastFrames.Swap(cur)
st.framesPerSecond.Store(cur - prev)

require.Equal(t, int64(10), st.framesPerSecond.Load())

for range 5 {
st.IncFramesReceived()
}

cur = st.framesReceived.Load()
prev = st.lastFrames.Swap(cur)
st.framesPerSecond.Store(cur - prev)

require.Equal(t, int64(5), st.framesPerSecond.Load())
}

func TestComputeRatesIntegration(t *testing.T) {
st := NewJSONStats()
go st.computeRates()

for range 20 {
st.IncFramesReceived()
}

time.Sleep(1100 * time.Millisecond)

require.Greater(t, st.framesPerSecond.Load(), int64(0))
}
31 changes: 31 additions & 0 deletions ntripper/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Copyright (c) Facebook, Inc. and its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stats

import "sync/atomic"

type counters struct {
framesReceived atomic.Int64
connected atomic.Int64
reconnects atomic.Int64
}

type snapshot struct {
FramesPerSecond int64 `json:"frames_per_second"`
Connected int64 `json:"connected"`
Reconnects int64 `json:"reconnects"`
}
Loading