Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/cli/system/core/hubsync/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
// writes them to disk via the connection render layer.
// It returns a formatted status message with the count
// of synced entries, or an empty string when nothing
// was fetched. Every error is silently swallowed so the
// hook never blocks the session start.
// was fetched. Every error path emits a warning via
// [warn.Warn] but returns an empty string so the hook
// never blocks the session start: warn, do not block.
// An empty result (zero entries returned) is not an
// error and produces no warning, distinguishing "nothing
// new to fetch" from "sync failed."
//
// The data flow is:
//
Expand Down
18 changes: 17 additions & 1 deletion internal/cli/system/core/hubsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/ActiveMemory/ctx/internal/cli/connection/core/render"
"github.com/ActiveMemory/ctx/internal/config/embed/text"
cfgHub "github.com/ActiveMemory/ctx/internal/config/hub"
cfgWarn "github.com/ActiveMemory/ctx/internal/config/warn"
"github.com/ActiveMemory/ctx/internal/hub"
"github.com/ActiveMemory/ctx/internal/log/warn"
)

// Connected reports whether a hub connection config exists.
Expand Down Expand Up @@ -56,6 +58,13 @@ func Connected(ctxDir string) (bool, error) {
// and a formatted status message, or empty string if no
// new entries.
//
// Every error path emits a warning via [warn.Warn] and
// returns an empty string so the calling hook never blocks
// the session start. Sync errors and empty results are
// deliberately distinct: an empty result is not a failure
// and produces no warning, while a real sync RPC failure
// is logged so operators can see why the hub is silent.
//
// Parameters:
// - sessionID: current session ID (unused, for future)
//
Expand All @@ -64,25 +73,32 @@ func Connected(ctxDir string) (bool, error) {
func Sync(_ string) string {
cfg, loadErr := connectCfg.Load()
if loadErr != nil {
warn.Warn(cfgWarn.HubsyncLoad, loadErr)
return ""
}

client, dialErr := hub.NewClient(
cfg.HubAddr, cfg.Token,
)
if dialErr != nil {
warn.Warn(cfgWarn.HubsyncDial, cfg.HubAddr, dialErr)
return ""
}
defer func() { _ = client.Close() }()

entries, syncErr := client.Sync(
context.Background(), cfg.Types, 0,
)
if syncErr != nil || len(entries) == 0 {
if syncErr != nil {
warn.Warn(cfgWarn.HubsyncSync, cfg.HubAddr, syncErr)
return ""
}
if len(entries) == 0 {
return ""
}

if writeErr := render.WriteEntries(entries); writeErr != nil {
warn.Warn(cfgWarn.HubsyncWrite, len(entries), writeErr)
return ""
}

Expand Down
125 changes: 125 additions & 0 deletions internal/cli/system/core/hubsync/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// / ctx: https://ctx.ist
// ,'`./ do you remember?
// `.,'\
// \ Copyright 2026-present Context contributors.
// SPDX-License-Identifier: Apache-2.0

package hubsync_test

import (
"bytes"
"os"
"path/filepath"
"strings"
"testing"

connectCfg "github.com/ActiveMemory/ctx/internal/cli/connection/core/config"
"github.com/ActiveMemory/ctx/internal/cli/system/core/hubsync"
"github.com/ActiveMemory/ctx/internal/config/dir"
"github.com/ActiveMemory/ctx/internal/config/fs"
"github.com/ActiveMemory/ctx/internal/crypto"
"github.com/ActiveMemory/ctx/internal/log/warn"
"github.com/ActiveMemory/ctx/internal/testutil/testctx"
)

// TestSync_WarnsOnLoadError verifies that the session-start
// hub sync hook emits a warning via [warn.Warn] when the
// connection config cannot be loaded, instead of swallowing
// the error and producing an empty status. The "warn but do
// not block" contract is documented in the package doc; this
// test is the regression guard for ActiveMemory/ctx#100.
func TestSync_WarnsOnLoadError(t *testing.T) {
tempDir := t.TempDir()
testctx.Declare(t, tempDir)

var buf bytes.Buffer
restore := warn.SetSinkForTesting(&buf)
defer restore()

msg := hubsync.Sync("session-id-ignored")
if msg != "" {
t.Errorf("Sync should return empty on load error, got %q", msg)
}

got := buf.String()
if !strings.Contains(got, "hubsync: load connection config:") {
t.Errorf(
"warning output missing hubsync load prefix; got %q",
got,
)
}
}

// TestSync_WarnsOnDialError verifies that hubsync emits a
// warning when grpc.NewClient rejects a malformed hub
// address. "%" reliably trips the URL parser inside
// grpc.NewClient ("invalid URL escape"), exercising the
// dial-error branch without needing a network listener.
// Anything reachable enough to satisfy NewClient (like
// "dns:///") would defer the failure to the Sync RPC and
// hit a different warn path; this test is specifically the
// regression guard for the dial-error branch.
func TestSync_WarnsOnDialError(t *testing.T) {
tempDir := t.TempDir()
ctxDir := testctx.Declare(t, tempDir)
if mkErr := os.Mkdir(ctxDir, fs.PermKeyDir); mkErr != nil {
t.Fatal(mkErr)
}
writeTestKey(t, tempDir)
if saveErr := connectCfg.Save(connectCfg.Config{
HubAddr: "%",
Token: "test-token",
}); saveErr != nil {
t.Fatal(saveErr)
}

var buf bytes.Buffer
restore := warn.SetSinkForTesting(&buf)
defer restore()

msg := hubsync.Sync("session-id-ignored")
if msg != "" {
t.Errorf("Sync should return empty on dial error, got %q", msg)
}

got := buf.String()
if !strings.Contains(got, "hubsync: dial %:") {
t.Errorf(
"warning output missing hubsync dial prefix; got %q",
got,
)
}
}

// TestSync_NonBlockingOnLoadError verifies the second half of
// the contract: even when the load fails and a warning is
// emitted, Sync returns without panicking or propagating an
// error to the caller. The check-hub-sync hook depends on
// this invariant to never block session start.
func TestSync_NonBlockingOnLoadError(t *testing.T) {
tempDir := t.TempDir()
testctx.Declare(t, tempDir)

restore := warn.SetSinkForTesting(&bytes.Buffer{})
defer restore()

// The bare fact that Sync returns at all (rather than
// panicking or hanging) is the assertion. The returned
// string is checked separately in TestSync_WarnsOnLoadError.
_ = hubsync.Sync("")
}

func writeTestKey(t *testing.T, home string) {
t.Helper()
key, genErr := crypto.GenerateKey()
if genErr != nil {
t.Fatal(genErr)
}
keyDir := filepath.Join(home, dir.CtxData)
if mkErr := os.Mkdir(keyDir, fs.PermKeyDir); mkErr != nil {
t.Fatal(mkErr)
}
if saveErr := crypto.SaveKey(crypto.GlobalKeyPath(), key); saveErr != nil {
t.Fatal(saveErr)
}
}
45 changes: 45 additions & 0 deletions internal/config/warn/warn.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,51 @@ const (
// until the tombstone line is removed.
SteeringUnfilled = "skipping unfilled steering file %s " +
"(remove the tombstone line to activate)"

// HubsyncLoad is the stderr format for connection config
// load failures in the session-start hub sync hook. Takes
// (error).
HubsyncLoad = "hubsync: load connection config: %v"

// HubsyncDial is the stderr format for hub dial failures
// in the session-start hub sync hook. Takes (addr, error).
HubsyncDial = "hubsync: dial %s: %v"

// HubsyncSync is the stderr format for sync RPC failures
// in the session-start hub sync hook. Takes (addr, error).
HubsyncSync = "hubsync: sync from %s: %v"

// HubsyncWrite is the stderr format for failures writing
// fetched entries to .context/hub/. Takes (count, error).
HubsyncWrite = "hubsync: write %d entries: %v"

// ReplicateDial is the stderr format for replication
// gRPC dial failures. Takes (masterAddr, error).
ReplicateDial = "replication: dial %s: %v"

// ReplicateStream is the stderr format for replication
// stream open failures. Takes (masterAddr, error).
ReplicateStream = "replication: open stream %s: %v"

// ReplicateSend is the stderr format for replication
// SyncRequest send failures. Takes (masterAddr,
// sinceSequence, error).
ReplicateSend = "replication: send sync request to %s " +
"(since=%d): %v"

// ReplicateCloseSend is the stderr format for
// replication CloseSend failures. Takes (masterAddr,
// error).
ReplicateCloseSend = "replication: close send to %s: %v"

// ReplicateRecv is the stderr format for replication
// receive failures. Takes (masterAddr, error).
ReplicateRecv = "replication: recv from %s: %v"

// ReplicateAppend is the stderr format for replication
// append failures. Takes (entryID, sequence, error).
ReplicateAppend = "replication: append entry %s " +
"(seq=%d): %v"
)

// Warn context identifiers for index generation.
Expand Down
33 changes: 32 additions & 1 deletion internal/hub/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

cfgHub "github.com/ActiveMemory/ctx/internal/config/hub"
cfgWarn "github.com/ActiveMemory/ctx/internal/config/warn"
"github.com/ActiveMemory/ctx/internal/log/warn"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -58,6 +60,13 @@ func startReplication(
// replicateOnce connects to the master, syncs all entries
// since the local store's last sequence, and appends them.
//
// Every error path emits a warning via [warn.Warn] and
// returns so the outer [startReplication] loop can retry on
// the next tick. Append failures are the one exception: the
// receive loop continues after warning so a single bad
// write does not block subsequent entries from being
// consumed.
//
// Parameters:
// - ctx: context for cancellation
// - masterAddr: gRPC address of the master hub
Expand All @@ -79,6 +88,7 @@ func replicateOnce(
),
)
if dialErr != nil {
warn.Warn(cfgWarn.ReplicateDial, masterAddr, dialErr)
return
}
defer func() { _ = conn.Close() }()
Expand All @@ -92,21 +102,32 @@ func replicateOnce(
cfgHub.PathSync,
)
if streamErr != nil {
warn.Warn(cfgWarn.ReplicateStream, masterAddr, streamErr)
return
}

if sendErr := stream.SendMsg(&SyncRequest{
SinceSequence: lastSeq,
}); sendErr != nil {
warn.Warn(
cfgWarn.ReplicateSend, masterAddr, lastSeq, sendErr,
)
return
}
if closeErr := stream.CloseSend(); closeErr != nil {
warn.Warn(
cfgWarn.ReplicateCloseSend, masterAddr, closeErr,
)
return
}

for {
msg := &EntryMsg{}
if recvErr := stream.RecvMsg(msg); recvErr != nil {
if eof(recvErr) {
return
}
warn.Warn(cfgWarn.ReplicateRecv, masterAddr, recvErr)
return
}
entry := Entry{
Expand All @@ -118,6 +139,16 @@ func replicateOnce(
Timestamp: time.Unix(msg.Timestamp, 0),
Sequence: msg.Sequence,
}
_, _ = store.Append([]Entry{entry})
if _, appendErr := store.Append(
[]Entry{entry},
); appendErr != nil {
warn.Warn(
cfgWarn.ReplicateAppend,
entry.ID, entry.Sequence, appendErr,
)
// Keep consuming: the next entry may succeed,
// and aborting the loop on a single bad write
// would silently drop everything queued behind it.
}
}
}
48 changes: 48 additions & 0 deletions internal/hub/replicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// / ctx: https://ctx.ist
// ,'`./ do you remember?
// `.,'\
// \ Copyright 2026-present Context contributors.
// SPDX-License-Identifier: Apache-2.0

package hub

import (
"bytes"
"context"
"strings"
"testing"

"github.com/ActiveMemory/ctx/internal/log/warn"
)

// TestReplicateOnce_WarnsOnDialError verifies that
// replicateOnce emits a warning via [warn.Warn] when the
// gRPC NewClient call fails on a malformed target. The "%"
// literal trips grpc.NewClient's URL parser ("invalid URL
// escape") before any network I/O - the most reliable way
// to hit the dial-error branch without standing up a
// listener. Regression guard for ActiveMemory/ctx#100:
// pre-fix, dial failures returned silently and the
// follower would loop forever with no operator-visible
// signal that the master address was unreachable or
// malformed.
func TestReplicateOnce_WarnsOnDialError(t *testing.T) {
store, storeErr := NewStore(t.TempDir())
if storeErr != nil {
t.Fatal(storeErr)
}

var buf bytes.Buffer
restore := warn.SetSinkForTesting(&buf)
defer restore()

replicateOnce(context.Background(), "%", store, "token")

got := buf.String()
if !strings.Contains(got, "replication: dial %:") {
t.Errorf(
"warning output missing replication dial prefix; got %q",
got,
)
}
}
Loading