From c6fb2515424b31f413deeae631b7bd44313a4c53 Mon Sep 17 00:00:00 2001 From: Nanook Date: Sun, 24 May 2026 04:44:26 +0000 Subject: [PATCH] fix: warn on hub sync and replication failures Signed-off-by: Nanook --- internal/cli/system/core/hubsync/doc.go | 8 +- internal/cli/system/core/hubsync/sync.go | 18 ++- internal/cli/system/core/hubsync/sync_test.go | 125 ++++++++++++++++++ internal/config/warn/warn.go | 45 +++++++ internal/hub/replicate.go | 33 ++++- internal/hub/replicate_test.go | 48 +++++++ internal/log/warn/warn.go | 17 +++ specs/hubsync-replication-warnings.md | 25 ++++ 8 files changed, 315 insertions(+), 4 deletions(-) create mode 100644 internal/cli/system/core/hubsync/sync_test.go create mode 100644 internal/hub/replicate_test.go create mode 100644 specs/hubsync-replication-warnings.md diff --git a/internal/cli/system/core/hubsync/doc.go b/internal/cli/system/core/hubsync/doc.go index 07f821077..21ae712a2 100644 --- a/internal/cli/system/core/hubsync/doc.go +++ b/internal/cli/system/core/hubsync/doc.go @@ -23,8 +23,12 @@ // writes them to disk via the connection render layer. // It returns a formatted status message with the count // of synced entries, or an empty string when nothing -// was fetched. Every error is silently swallowed so the -// hook never blocks the session start. +// was fetched. Every error path emits a warning via +// [warn.Warn] but returns an empty string so the hook +// never blocks the session start: warn, do not block. +// An empty result (zero entries returned) is not an +// error and produces no warning, distinguishing "nothing +// new to fetch" from "sync failed." // // The data flow is: // diff --git a/internal/cli/system/core/hubsync/sync.go b/internal/cli/system/core/hubsync/sync.go index 3f4963e2e..20071eabb 100644 --- a/internal/cli/system/core/hubsync/sync.go +++ b/internal/cli/system/core/hubsync/sync.go @@ -18,7 +18,9 @@ import ( "github.com/ActiveMemory/ctx/internal/cli/connection/core/render" "github.com/ActiveMemory/ctx/internal/config/embed/text" cfgHub "github.com/ActiveMemory/ctx/internal/config/hub" + cfgWarn "github.com/ActiveMemory/ctx/internal/config/warn" "github.com/ActiveMemory/ctx/internal/hub" + "github.com/ActiveMemory/ctx/internal/log/warn" ) // Connected reports whether a hub connection config exists. @@ -56,6 +58,13 @@ func Connected(ctxDir string) (bool, error) { // and a formatted status message, or empty string if no // new entries. // +// Every error path emits a warning via [warn.Warn] and +// returns an empty string so the calling hook never blocks +// the session start. Sync errors and empty results are +// deliberately distinct: an empty result is not a failure +// and produces no warning, while a real sync RPC failure +// is logged so operators can see why the hub is silent. +// // Parameters: // - sessionID: current session ID (unused, for future) // @@ -64,6 +73,7 @@ func Connected(ctxDir string) (bool, error) { func Sync(_ string) string { cfg, loadErr := connectCfg.Load() if loadErr != nil { + warn.Warn(cfgWarn.HubsyncLoad, loadErr) return "" } @@ -71,6 +81,7 @@ func Sync(_ string) string { cfg.HubAddr, cfg.Token, ) if dialErr != nil { + warn.Warn(cfgWarn.HubsyncDial, cfg.HubAddr, dialErr) return "" } defer func() { _ = client.Close() }() @@ -78,11 +89,16 @@ func Sync(_ string) string { entries, syncErr := client.Sync( context.Background(), cfg.Types, 0, ) - if syncErr != nil || len(entries) == 0 { + if syncErr != nil { + warn.Warn(cfgWarn.HubsyncSync, cfg.HubAddr, syncErr) + return "" + } + if len(entries) == 0 { return "" } if writeErr := render.WriteEntries(entries); writeErr != nil { + warn.Warn(cfgWarn.HubsyncWrite, len(entries), writeErr) return "" } diff --git a/internal/cli/system/core/hubsync/sync_test.go b/internal/cli/system/core/hubsync/sync_test.go new file mode 100644 index 000000000..50f40fc5b --- /dev/null +++ b/internal/cli/system/core/hubsync/sync_test.go @@ -0,0 +1,125 @@ +// / ctx: https://ctx.ist +// ,'`./ do you remember? +// `.,'\ +// \ Copyright 2026-present Context contributors. +// SPDX-License-Identifier: Apache-2.0 + +package hubsync_test + +import ( + "bytes" + "os" + "path/filepath" + "strings" + "testing" + + connectCfg "github.com/ActiveMemory/ctx/internal/cli/connection/core/config" + "github.com/ActiveMemory/ctx/internal/cli/system/core/hubsync" + "github.com/ActiveMemory/ctx/internal/config/dir" + "github.com/ActiveMemory/ctx/internal/config/fs" + "github.com/ActiveMemory/ctx/internal/crypto" + "github.com/ActiveMemory/ctx/internal/log/warn" + "github.com/ActiveMemory/ctx/internal/testutil/testctx" +) + +// TestSync_WarnsOnLoadError verifies that the session-start +// hub sync hook emits a warning via [warn.Warn] when the +// connection config cannot be loaded, instead of swallowing +// the error and producing an empty status. The "warn but do +// not block" contract is documented in the package doc; this +// test is the regression guard for ActiveMemory/ctx#100. +func TestSync_WarnsOnLoadError(t *testing.T) { + tempDir := t.TempDir() + testctx.Declare(t, tempDir) + + var buf bytes.Buffer + restore := warn.SetSinkForTesting(&buf) + defer restore() + + msg := hubsync.Sync("session-id-ignored") + if msg != "" { + t.Errorf("Sync should return empty on load error, got %q", msg) + } + + got := buf.String() + if !strings.Contains(got, "hubsync: load connection config:") { + t.Errorf( + "warning output missing hubsync load prefix; got %q", + got, + ) + } +} + +// TestSync_WarnsOnDialError verifies that hubsync emits a +// warning when grpc.NewClient rejects a malformed hub +// address. "%" reliably trips the URL parser inside +// grpc.NewClient ("invalid URL escape"), exercising the +// dial-error branch without needing a network listener. +// Anything reachable enough to satisfy NewClient (like +// "dns:///") would defer the failure to the Sync RPC and +// hit a different warn path; this test is specifically the +// regression guard for the dial-error branch. +func TestSync_WarnsOnDialError(t *testing.T) { + tempDir := t.TempDir() + ctxDir := testctx.Declare(t, tempDir) + if mkErr := os.Mkdir(ctxDir, fs.PermKeyDir); mkErr != nil { + t.Fatal(mkErr) + } + writeTestKey(t, tempDir) + if saveErr := connectCfg.Save(connectCfg.Config{ + HubAddr: "%", + Token: "test-token", + }); saveErr != nil { + t.Fatal(saveErr) + } + + var buf bytes.Buffer + restore := warn.SetSinkForTesting(&buf) + defer restore() + + msg := hubsync.Sync("session-id-ignored") + if msg != "" { + t.Errorf("Sync should return empty on dial error, got %q", msg) + } + + got := buf.String() + if !strings.Contains(got, "hubsync: dial %:") { + t.Errorf( + "warning output missing hubsync dial prefix; got %q", + got, + ) + } +} + +// TestSync_NonBlockingOnLoadError verifies the second half of +// the contract: even when the load fails and a warning is +// emitted, Sync returns without panicking or propagating an +// error to the caller. The check-hub-sync hook depends on +// this invariant to never block session start. +func TestSync_NonBlockingOnLoadError(t *testing.T) { + tempDir := t.TempDir() + testctx.Declare(t, tempDir) + + restore := warn.SetSinkForTesting(&bytes.Buffer{}) + defer restore() + + // The bare fact that Sync returns at all (rather than + // panicking or hanging) is the assertion. The returned + // string is checked separately in TestSync_WarnsOnLoadError. + _ = hubsync.Sync("") +} + +func writeTestKey(t *testing.T, home string) { + t.Helper() + key, genErr := crypto.GenerateKey() + if genErr != nil { + t.Fatal(genErr) + } + keyDir := filepath.Join(home, dir.CtxData) + if mkErr := os.Mkdir(keyDir, fs.PermKeyDir); mkErr != nil { + t.Fatal(mkErr) + } + if saveErr := crypto.SaveKey(crypto.GlobalKeyPath(), key); saveErr != nil { + t.Fatal(saveErr) + } +} diff --git a/internal/config/warn/warn.go b/internal/config/warn/warn.go index 835815d0d..00bfe6811 100644 --- a/internal/config/warn/warn.go +++ b/internal/config/warn/warn.go @@ -114,6 +114,51 @@ const ( // until the tombstone line is removed. SteeringUnfilled = "skipping unfilled steering file %s " + "(remove the tombstone line to activate)" + + // HubsyncLoad is the stderr format for connection config + // load failures in the session-start hub sync hook. Takes + // (error). + HubsyncLoad = "hubsync: load connection config: %v" + + // HubsyncDial is the stderr format for hub dial failures + // in the session-start hub sync hook. Takes (addr, error). + HubsyncDial = "hubsync: dial %s: %v" + + // HubsyncSync is the stderr format for sync RPC failures + // in the session-start hub sync hook. Takes (addr, error). + HubsyncSync = "hubsync: sync from %s: %v" + + // HubsyncWrite is the stderr format for failures writing + // fetched entries to .context/hub/. Takes (count, error). + HubsyncWrite = "hubsync: write %d entries: %v" + + // ReplicateDial is the stderr format for replication + // gRPC dial failures. Takes (masterAddr, error). + ReplicateDial = "replication: dial %s: %v" + + // ReplicateStream is the stderr format for replication + // stream open failures. Takes (masterAddr, error). + ReplicateStream = "replication: open stream %s: %v" + + // ReplicateSend is the stderr format for replication + // SyncRequest send failures. Takes (masterAddr, + // sinceSequence, error). + ReplicateSend = "replication: send sync request to %s " + + "(since=%d): %v" + + // ReplicateCloseSend is the stderr format for + // replication CloseSend failures. Takes (masterAddr, + // error). + ReplicateCloseSend = "replication: close send to %s: %v" + + // ReplicateRecv is the stderr format for replication + // receive failures. Takes (masterAddr, error). + ReplicateRecv = "replication: recv from %s: %v" + + // ReplicateAppend is the stderr format for replication + // append failures. Takes (entryID, sequence, error). + ReplicateAppend = "replication: append entry %s " + + "(seq=%d): %v" ) // Warn context identifiers for index generation. diff --git a/internal/hub/replicate.go b/internal/hub/replicate.go index 85bc4b761..765c79a9d 100644 --- a/internal/hub/replicate.go +++ b/internal/hub/replicate.go @@ -11,6 +11,8 @@ import ( "time" cfgHub "github.com/ActiveMemory/ctx/internal/config/hub" + cfgWarn "github.com/ActiveMemory/ctx/internal/config/warn" + "github.com/ActiveMemory/ctx/internal/log/warn" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -58,6 +60,13 @@ func startReplication( // replicateOnce connects to the master, syncs all entries // since the local store's last sequence, and appends them. // +// Every error path emits a warning via [warn.Warn] and +// returns so the outer [startReplication] loop can retry on +// the next tick. Append failures are the one exception: the +// receive loop continues after warning so a single bad +// write does not block subsequent entries from being +// consumed. +// // Parameters: // - ctx: context for cancellation // - masterAddr: gRPC address of the master hub @@ -79,6 +88,7 @@ func replicateOnce( ), ) if dialErr != nil { + warn.Warn(cfgWarn.ReplicateDial, masterAddr, dialErr) return } defer func() { _ = conn.Close() }() @@ -92,21 +102,32 @@ func replicateOnce( cfgHub.PathSync, ) if streamErr != nil { + warn.Warn(cfgWarn.ReplicateStream, masterAddr, streamErr) return } if sendErr := stream.SendMsg(&SyncRequest{ SinceSequence: lastSeq, }); sendErr != nil { + warn.Warn( + cfgWarn.ReplicateSend, masterAddr, lastSeq, sendErr, + ) return } if closeErr := stream.CloseSend(); closeErr != nil { + warn.Warn( + cfgWarn.ReplicateCloseSend, masterAddr, closeErr, + ) return } for { msg := &EntryMsg{} if recvErr := stream.RecvMsg(msg); recvErr != nil { + if eof(recvErr) { + return + } + warn.Warn(cfgWarn.ReplicateRecv, masterAddr, recvErr) return } entry := Entry{ @@ -118,6 +139,16 @@ func replicateOnce( Timestamp: time.Unix(msg.Timestamp, 0), Sequence: msg.Sequence, } - _, _ = store.Append([]Entry{entry}) + if _, appendErr := store.Append( + []Entry{entry}, + ); appendErr != nil { + warn.Warn( + cfgWarn.ReplicateAppend, + entry.ID, entry.Sequence, appendErr, + ) + // Keep consuming: the next entry may succeed, + // and aborting the loop on a single bad write + // would silently drop everything queued behind it. + } } } diff --git a/internal/hub/replicate_test.go b/internal/hub/replicate_test.go new file mode 100644 index 000000000..0f7401e3e --- /dev/null +++ b/internal/hub/replicate_test.go @@ -0,0 +1,48 @@ +// / ctx: https://ctx.ist +// ,'`./ do you remember? +// `.,'\ +// \ Copyright 2026-present Context contributors. +// SPDX-License-Identifier: Apache-2.0 + +package hub + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/ActiveMemory/ctx/internal/log/warn" +) + +// TestReplicateOnce_WarnsOnDialError verifies that +// replicateOnce emits a warning via [warn.Warn] when the +// gRPC NewClient call fails on a malformed target. The "%" +// literal trips grpc.NewClient's URL parser ("invalid URL +// escape") before any network I/O - the most reliable way +// to hit the dial-error branch without standing up a +// listener. Regression guard for ActiveMemory/ctx#100: +// pre-fix, dial failures returned silently and the +// follower would loop forever with no operator-visible +// signal that the master address was unreachable or +// malformed. +func TestReplicateOnce_WarnsOnDialError(t *testing.T) { + store, storeErr := NewStore(t.TempDir()) + if storeErr != nil { + t.Fatal(storeErr) + } + + var buf bytes.Buffer + restore := warn.SetSinkForTesting(&buf) + defer restore() + + replicateOnce(context.Background(), "%", store, "token") + + got := buf.String() + if !strings.Contains(got, "replication: dial %:") { + t.Errorf( + "warning output missing replication dial prefix; got %q", + got, + ) + } +} diff --git a/internal/log/warn/warn.go b/internal/log/warn/warn.go index d88abfa84..6b29c8c25 100644 --- a/internal/log/warn/warn.go +++ b/internal/log/warn/warn.go @@ -35,3 +35,20 @@ func Warn(format string, args ...any) { _, _ = fmt.Fprintf( sink, cfgCtx.StderrPrefix+format+token.NewlineLF, args...) } + +// SetSinkForTesting swaps the warn sink for the duration of a +// test and returns a restore function the caller must defer. +// The package-level sink is otherwise unexported because there +// is no production reason to redirect it; tests that assert +// against captured warning output go through this helper. +// +// Parameters: +// - w: writer to receive warnings during the test +// +// Returns: +// - func(): restores the previous sink when called +func SetSinkForTesting(w io.Writer) func() { + prev := sink + sink = w + return func() { sink = prev } +} diff --git a/specs/hubsync-replication-warnings.md b/specs/hubsync-replication-warnings.md new file mode 100644 index 000000000..4e50a7169 --- /dev/null +++ b/specs/hubsync-replication-warnings.md @@ -0,0 +1,25 @@ +# Hubsync And Replication Warnings + +Governing issue: ActiveMemory/ctx#100. + +## Problem + +The session-start hub sync hook and hub replication loop intentionally avoid +blocking callers when a best-effort sync fails. Several failure paths currently +return silently, making operators see no synced entries without any diagnostic +that distinguishes "nothing new" from "sync failed." + +## Requirements + +- Preserve non-blocking behavior: failures still return an empty hubsync status + or let the replication loop retry later. +- Emit stderr warnings via `internal/log/warn.Warn` on hubsync load, dial, sync, + and write failures. +- Do not warn when hubsync returns zero entries successfully. +- Emit stderr warnings via `internal/log/warn.Warn` on replication-loop failures, + including the dial failure path. +- Keep warning text local and literal unless an existing convention clearly + requires shared constants. +- Document hubsync's warn-not-block behavior in the package documentation. +- Add focused tests for warning output where practical, especially hubsync + load/dial failures and replication dial failure.