From ca689be582156d0ff78e4625e4d3d5891b5e8c99 Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 13:09:35 +0800 Subject: [PATCH 1/6] fix(sync): preserve peer and mesh folder membership --- internal/cli/mesh.go | 67 ++++++- internal/cli/syncthing.go | 183 +++++++++++++++---- internal/cli/syncthing_test.go | 316 ++++++++++++++++++++++++++++++++- 3 files changed, 526 insertions(+), 40 deletions(-) diff --git a/internal/cli/mesh.go b/internal/cli/mesh.go index 8d9b071..8b15ceb 100644 --- a/internal/cli/mesh.go +++ b/internal/cli/mesh.go @@ -290,7 +290,11 @@ func configureSyncthingMeshHub(ctx context.Context, base, key, hubDeviceID strin fm["path"] = folderPath fm["type"] = "sendreceive" fm["markerName"] = "." - fm["devices"] = folderDevices + mergedDevices, err := syncthingMergeMeshHubFolderDevices(fm["devices"], devices, hubDeviceID, receiverIDs) + if err != nil { + return err + } + fm["devices"] = mergedDevices applyManagedSyncthingFolderDefaults(fm, 60, 1, false) filteredFolders = append(filteredFolders, fm) foundFolder = true @@ -315,6 +319,67 @@ func configureSyncthingMeshHub(ctx context.Context, base, key, hubDeviceID strin return syncthingSetConfig(ctx, base, key, cfg) } +func syncthingMergeMeshHubFolderDevices(existingFolderDevices, devices any, hubDeviceID string, receiverIDs []string) ([]any, error) { + deviceEntries, err := syncthingObjectArray(map[string]any{"devices": devices}, "devices") + if err != nil { + return nil, err + } + deviceNames := make(map[string]string, len(deviceEntries)) + for _, d := range deviceEntries { + m, err := syncthingObjectMap(d, "devices") + if err != nil { + return nil, err + } + deviceNames[asString(m["deviceID"])] = asString(m["name"]) + } + + merged := make([]any, 0, 1+len(receiverIDs)+1) + merged = append(merged, map[string]any{"deviceID": hubDeviceID}) + receiverSet := make(map[string]struct{}, len(receiverIDs)) + for _, id := range receiverIDs { + receiverSet[id] = struct{}{} + } + + folderEntries, ok := existingFolderDevices.([]any) + if ok { + seen := map[string]struct{}{hubDeviceID: {}} + for _, d := range folderEntries { + m, err := syncthingObjectMap(d, "folder devices") + if err != nil { + return nil, err + } + id := asString(m["deviceID"]) + if id == "" { + continue + } + if _, exists := seen[id]; exists { + continue + } + if _, isReceiver := receiverSet[id]; isReceiver { + continue + } + if deviceNames[id] == "okdev-mesh-receiver" { + continue + } + seen[id] = struct{}{} + merged = append(merged, map[string]any{"deviceID": id}) + } + for _, id := range receiverIDs { + if _, exists := seen[id]; exists { + continue + } + seen[id] = struct{}{} + merged = append(merged, map[string]any{"deviceID": id}) + } + return merged, nil + } + + for _, id := range receiverIDs { + merged = append(merged, map[string]any{"deviceID": id}) + } + return merged, nil +} + // configureAndWaitMeshReceiver configures a single receiver pod's syncthing // to peer with the hub and waits for initial sync to complete. func configureAndWaitMeshReceiver(ctx context.Context, opts *Options, k *kube.Client, namespace string, pod kube.PodSummary, recvKey, recvDeviceID, hubBase, hubKey, hubDeviceID, hubAddr, folderID, workspaceMountPath string, timeout time.Duration) meshReceiverStatus { diff --git a/internal/cli/syncthing.go b/internal/cli/syncthing.go index 39aa6d8..858d20d 100644 --- a/internal/cli/syncthing.go +++ b/internal/cli/syncthing.go @@ -37,6 +37,7 @@ import ( const ( syncthingContainerName = "okdev-sidecar" localSyncthingGUIAddr = "127.0.0.1:0" + managedSyncthingPeer = "okdev-peer" ) // defaultSyncExcludes are written to a starter .stignore when one does not @@ -57,24 +58,24 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm return fmt.Errorf("syncthing engine currently supports exactly one sync path mapping") } - ctx, cancel := context.WithTimeout(cmd.Context(), syncthingBootstrapTimeout) - defer cancel() - localBinary, err := syncthing.EnsureBinary(ctx, cfg.Spec.Sync.Syncthing.Version, cfg.Spec.Sync.Syncthing.AutoInstallEnabled()) + bootstrapCtx, cancelBootstrap, runtimeCtx := syncthingBootstrapAndRuntimeContexts(cmd.Context()) + defer cancelBootstrap() + localBinary, err := syncthing.EnsureBinary(bootstrapCtx, cfg.Spec.Sync.Syncthing.Version, cfg.Spec.Sync.Syncthing.AutoInstallEnabled()) if err != nil { return fmt.Errorf("prepare local syncthing binary: %w", err) } - target, err := resolveTargetRef(cmd.Context(), opts, cfg, namespace, sessionName, k) + target, err := resolveTargetRef(bootstrapCtx, opts, cfg, namespace, sessionName, k) if err != nil { return err } pod := target.PodName - if _, err := execInSyncthingContainer(ctx, k, namespace, pod, "command -v syncthing >/dev/null 2>&1"); err != nil { - refreshed, refreshErr := resolveTargetRef(cmd.Context(), opts, cfg, namespace, sessionName, k) + if _, err := execInSyncthingContainer(bootstrapCtx, k, namespace, pod, "command -v syncthing >/dev/null 2>&1"); err != nil { + refreshed, refreshErr := resolveTargetRef(bootstrapCtx, opts, cfg, namespace, sessionName, k) if refreshErr == nil && strings.TrimSpace(refreshed.PodName) != "" && refreshed.PodName != pod { pod = refreshed.PodName target = refreshed - if _, retryErr := execInSyncthingContainer(ctx, k, namespace, pod, "command -v syncthing >/dev/null 2>&1"); retryErr == nil { + if _, retryErr := execInSyncthingContainer(bootstrapCtx, k, namespace, pod, "command -v syncthing >/dev/null 2>&1"); retryErr == nil { err = nil } else { err = retryErr @@ -96,41 +97,38 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm if err := writeLocalSTIgnore(absLocal); err != nil { return err } - if _, err := execInSyncthingContainer(ctx, k, namespace, pod, fmt.Sprintf("mkdir -p %s", syncengine.ShellEscape(pair.Remote))); err != nil { - return err - } - if err := writeRemoteSTIgnoreInPod(ctx, k, namespace, pod, pair.Remote, cfg.Spec.Sync.RemoteIgnore); err != nil { + if _, err := execInSyncthingContainer(bootstrapCtx, k, namespace, pod, fmt.Sprintf("mkdir -p %s", syncengine.ShellEscape(pair.Remote))); err != nil { return err } localHome, err := localSyncthingHome(sessionName) if err != nil { return err } - cancelPF, localRemoteAPIBase, localRemotePeerAddr, err := startSyncthingPortForward(ctx, opts, namespace, pod) + cancelPF, localRemoteAPIBase, localRemotePeerAddr, err := startSyncthingPortForward(runtimeCtx, opts, namespace, pod) if err != nil { return err } defer cancelPF() - remoteKey, err := readRemoteSyncthingAPIKey(ctx, k, namespace, pod) + remoteKey, err := readRemoteSyncthingAPIKey(bootstrapCtx, k, namespace, pod) if err != nil { return err } - localBase, localKey, localPID, err := startAndWaitForLocalSyncthing(ctx, localBinary, localHome) + localBase, localKey, localPID, err := startAndWaitForLocalSyncthing(bootstrapCtx, localBinary, localHome) if err != nil { return err } remoteBase := localRemoteAPIBase - if err := waitSyncthingAPI(ctx, remoteBase, remoteKey, syncthingAPIReadyTimeout); err != nil { + if err := waitSyncthingAPI(bootstrapCtx, remoteBase, remoteKey, syncthingAPIReadyTimeout); err != nil { return fmt.Errorf("remote syncthing not ready: %w", err) } - localID, err := syncthingDeviceID(ctx, localBase, localKey) + localID, err := syncthingDeviceID(bootstrapCtx, localBase, localKey) if err != nil { return err } - remoteID, err := syncthingDeviceID(ctx, remoteBase, remoteKey) + remoteID, err := syncthingDeviceID(bootstrapCtx, remoteBase, remoteKey) if err != nil { return err } @@ -150,20 +148,23 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm if mode == "two-phase" { // The bootstrap context has a short timeout (syncthingBootstrapTimeout). // Two-phase initial sync may take much longer, so use a dedicated context. - twoPhaseCtx, twoPhaseCancel := context.WithTimeout(context.Background(), initialSyncTimeout) + twoPhaseCtx, twoPhaseCancel := context.WithTimeout(runtimeCtx, initialSyncTimeout) defer twoPhaseCancel() - if err := runTwoPhaseInitialSync(twoPhaseCtx, cmd.OutOrStdout(), localBase, localKey, localID, remoteBase, remoteKey, remoteID, localRemotePeerAddr, folderID, absLocal, pair.Remote, syncCfg); err != nil { + if err := runTwoPhaseInitialSync(twoPhaseCtx, cmd.OutOrStdout(), localBase, localKey, localID, remoteBase, remoteKey, remoteID, localRemotePeerAddr, folderID, absLocal, pair.Remote, cfg.Spec.Sync.RemoteIgnore, syncCfg); err != nil { return fmt.Errorf("two-phase initial sync: %w", err) } mode = "bi" } else { folderTypeLocal, folderTypeRemote := folderTypesForMode(mode) - if err := configureSyncthingPeer(ctx, localBase, localKey, localID, remoteID, localRemotePeerAddr, folderID, absLocal, folderTypeLocal, syncCfg.rescanInterval, syncCfg.watcherDelay, false, syncCfg.relays, syncCfg.compression); err != nil { + if err := configureSyncthingPeer(bootstrapCtx, localBase, localKey, localID, remoteID, localRemotePeerAddr, folderID, absLocal, folderTypeLocal, syncCfg.rescanInterval, syncCfg.watcherDelay, false, syncCfg.relays, syncCfg.compression); err != nil { return fmt.Errorf("configure local syncthing: %w", err) } - if err := configureSyncthingPeer(ctx, remoteBase, remoteKey, remoteID, localID, "", folderID, pair.Remote, folderTypeRemote, syncCfg.rescanInterval, syncCfg.watcherDelay, false, syncCfg.relays, syncCfg.compression); err != nil { + if err := configureSyncthingPeer(bootstrapCtx, remoteBase, remoteKey, remoteID, localID, "", folderID, pair.Remote, folderTypeRemote, syncCfg.rescanInterval, syncCfg.watcherDelay, false, syncCfg.relays, syncCfg.compression); err != nil { return fmt.Errorf("configure remote syncthing: %w", err) } + if err := syncthingSetIgnores(bootstrapCtx, remoteBase, remoteKey, folderID, cfg.Spec.Sync.RemoteIgnore); err != nil { + return fmt.Errorf("configure remote syncthing ignores: %w", err) + } } if err := saveSyncthingConfigHash(sessionName, syncthingSessionConfigHash(cfg, absLocal, pair.Remote)); err != nil { @@ -188,7 +189,7 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm defer signal.Stop(sigCh) checker := &liveSyncHealthChecker{ - ctx: ctx, + ctx: runtimeCtx, opts: opts, namespace: namespace, pod: pod, @@ -205,6 +206,11 @@ func runSyncthingSync(cmd *cobra.Command, opts *Options, cfg *config.DevEnvironm return nil } +func syncthingBootstrapAndRuntimeContexts(parent context.Context) (context.Context, context.CancelFunc, context.Context) { + bootstrapCtx, cancel := context.WithTimeout(parent, syncthingBootstrapTimeout) + return bootstrapCtx, cancel, parent +} + // syncHealthChecker abstracts the peer connectivity check and port-forward // restoration so the health loop can be tested without real network calls. type syncHealthChecker interface { @@ -491,6 +497,7 @@ func writeLocalSTIgnore(localPath string) error { func writeRemoteSTIgnoreInPod(ctx context.Context, k interface { ExecShInContainer(context.Context, string, string, string, string) ([]byte, error) + CopyToPodInContainer(context.Context, string, string, string, string, string) error }, namespace, pod, remotePath string, excludes []string) error { content, ok := buildSTIgnoreContent(excludes) if !ok { @@ -501,8 +508,25 @@ func writeRemoteSTIgnoreInPod(ctx context.Context, k interface { return fmt.Errorf("refusing to write remote .stignore at unsafe sync root %q", remotePath) } stignorePath := path.Join(remotePath, ".stignore") + tempPath := stignorePath + ".tmp" + tempFile, err := os.CreateTemp("", "okdev-remote-stignore-*.tmp") + if err != nil { + return fmt.Errorf("create temp .stignore file: %w", err) + } + tempLocalPath := tempFile.Name() + defer os.Remove(tempLocalPath) + if _, err := tempFile.WriteString(content); err != nil { + _ = tempFile.Close() + return fmt.Errorf("write temp .stignore file: %w", err) + } + if err := tempFile.Close(); err != nil { + return fmt.Errorf("close temp .stignore file: %w", err) + } + if err := k.CopyToPodInContainer(ctx, namespace, tempLocalPath, pod, syncthingContainerName, tempPath); err != nil { + return fmt.Errorf("copy remote .stignore: %w", err) + } esc := syncengine.ShellEscape - script := fmt.Sprintf("mkdir -p %s && printf %%s %s > %s", esc(remotePath), esc(content), esc(stignorePath)) + script := fmt.Sprintf("mkdir -p %s && mv %s %s", esc(remotePath), esc(tempPath), esc(stignorePath)) if _, err := execInSyncthingContainer(ctx, k, namespace, pod, script); err != nil { return fmt.Errorf("write remote .stignore: %w", err) } @@ -1408,7 +1432,7 @@ type syncthingSyncConfig struct { // deletions before entering bidirectional mode. // 2. Reconfigure both sides to sendreceive and restart only if Syncthing // reports that the applied config requires it. -func runTwoPhaseInitialSync(ctx context.Context, out io.Writer, localBase, localKey, localID, remoteBase, remoteKey, remoteID, peerAddr, folderID, localPath, remotePath string, sc syncthingSyncConfig) error { +func runTwoPhaseInitialSync(ctx context.Context, out io.Writer, localBase, localKey, localID, remoteBase, remoteKey, remoteID, peerAddr, folderID, localPath, remotePath string, remoteIgnores []string, sc syncthingSyncConfig) error { // Phase 1: sendonly + ignoreDelete on local, receiveonly on remote. fmt.Fprintln(out, "Phase 1: local-authoritative initial sync …") if err := configureSyncthingPeer(ctx, localBase, localKey, localID, remoteID, peerAddr, folderID, localPath, "sendonly", sc.rescanInterval, sc.watcherDelay, true, sc.relays, sc.compression); err != nil { @@ -1417,6 +1441,9 @@ func runTwoPhaseInitialSync(ctx context.Context, out io.Writer, localBase, local if err := configureSyncthingPeer(ctx, remoteBase, remoteKey, remoteID, localID, "", folderID, remotePath, "receiveonly", sc.rescanInterval, sc.watcherDelay, false, sc.relays, sc.compression); err != nil { return fmt.Errorf("configure remote (phase 1): %w", err) } + if err := syncthingSetIgnores(ctx, remoteBase, remoteKey, folderID, remoteIgnores); err != nil { + return fmt.Errorf("configure remote ignores (phase 1): %w", err) + } // Poll until remote is in sync, calling override on every tick. deadline := time.Now().Add(initialSyncTimeout) @@ -1497,6 +1524,9 @@ func runTwoPhaseInitialSync(ctx context.Context, out io.Writer, localBase, local if err := configureSyncthingPeer(ctx, remoteBase, remoteKey, remoteID, localID, "", folderID, remotePath, "sendreceive", sc.rescanInterval, sc.watcherDelay, false, sc.relays, sc.compression); err != nil { return fmt.Errorf("configure remote (phase 2): %w", err) } + if err := syncthingSetIgnores(ctx, remoteBase, remoteKey, folderID, remoteIgnores); err != nil { + return fmt.Errorf("configure remote ignores (phase 2): %w", err) + } localRestartRequired, err := syncthingRestartRequired(ctx, localBase, localKey) if err != nil { @@ -1602,31 +1632,40 @@ func configureSyncthingPeer(ctx context.Context, base, key, selfID, peerID, peer } addresses := syncthingDeviceAddresses(peerAddr) foundDevice := false - for i, d := range devices { + filteredDevices := make([]any, 0, len(devices)) + for _, d := range devices { m, err := syncthingObjectMap(d, "devices") if err != nil { return err } - if asString(m["deviceID"]) == peerID { + deviceID := asString(m["deviceID"]) + if deviceID != peerID && isManagedSyncthingPeerDevice(m, selfID) { + continue + } + if deviceID == peerID { + if foundDevice { + continue + } m["addresses"] = addresses m["compression"] = compressionMode applyManagedSyncthingDeviceDefaults(m) - devices[i] = m foundDevice = true - break + filteredDevices = append(filteredDevices, m) + continue } + filteredDevices = append(filteredDevices, m) } if !foundDevice { device := map[string]any{ "deviceID": peerID, - "name": "okdev-peer", + "name": managedSyncthingPeer, "addresses": addresses, "compression": compressionMode, } applyManagedSyncthingDeviceDefaults(device) - devices = append(devices, device) + filteredDevices = append(filteredDevices, device) } - cfg["devices"] = devices + cfg["devices"] = filteredDevices folders, err := syncthingObjectArray(cfg, "folders") if err != nil { @@ -1646,10 +1685,11 @@ func configureSyncthingPeer(ctx context.Context, base, key, selfID, peerID, peer fm["path"] = folderPath fm["type"] = folderType fm["markerName"] = "." - fm["devices"] = []any{ - map[string]any{"deviceID": selfID}, - map[string]any{"deviceID": peerID}, + mergedDevices, err := syncthingMergeFolderDevices(fm["devices"], filteredDevices, selfID, peerID) + if err != nil { + return err } + fm["devices"] = mergedDevices applyManagedSyncthingFolderDefaults(fm, rescanIntervalSeconds, watcherDelaySeconds, ignoreDelete) filteredFolders = append(filteredFolders, fm) foundFolder = true @@ -1680,6 +1720,62 @@ func configureSyncthingPeer(ctx context.Context, base, key, selfID, peerID, peer return nil } +func syncthingMergeFolderDevices(existingFolderDevices, devices any, selfID, peerID string) ([]any, error) { + deviceEntries, err := syncthingObjectArray(map[string]any{"devices": devices}, "devices") + if err != nil { + return nil, err + } + deviceNames := make(map[string]string, len(deviceEntries)) + for _, d := range deviceEntries { + m, err := syncthingObjectMap(d, "devices") + if err != nil { + return nil, err + } + deviceNames[asString(m["deviceID"])] = asString(m["name"]) + } + + merged := []any{ + map[string]any{"deviceID": selfID}, + } + seen := map[string]struct{}{selfID: {}} + if strings.TrimSpace(peerID) != "" && peerID != selfID { + merged = append(merged, map[string]any{"deviceID": peerID}) + seen[peerID] = struct{}{} + } + + folderEntries, ok := existingFolderDevices.([]any) + if !ok { + return merged, nil + } + for _, d := range folderEntries { + m, err := syncthingObjectMap(d, "folder devices") + if err != nil { + return nil, err + } + id := asString(m["deviceID"]) + if id == "" { + continue + } + if _, exists := seen[id]; exists { + continue + } + if deviceNames[id] != "okdev-mesh-receiver" { + continue + } + seen[id] = struct{}{} + merged = append(merged, map[string]any{"deviceID": id}) + } + return merged, nil +} + +func isManagedSyncthingPeerDevice(device map[string]any, selfID string) bool { + deviceID := asString(device["deviceID"]) + if deviceID == "" || deviceID == selfID { + return false + } + return asString(device["name"]) == managedSyncthingPeer +} + // updateSyncthingDeviceAddress updates only the peer address for an existing // device in the local Syncthing config. This is used during self-healing to // point Syncthing at a new port-forward endpoint without touching folder or @@ -1874,6 +1970,25 @@ func syncthingPost(ctx context.Context, base, key, path string, body []byte) err return nil } +func syncthingSetIgnores(ctx context.Context, base, key, folderID string, ignores []string) error { + trimmed := make([]string, 0, len(ignores)) + for _, ignore := range ignores { + ignore = strings.TrimSpace(ignore) + if ignore == "" { + continue + } + trimmed = append(trimmed, ignore) + } + payload := map[string][]string{"ignore": trimmed} + body, err := json.Marshal(payload) + if err != nil { + return err + } + path := fmt.Sprintf("/rest/db/ignores?folder=%s", url.QueryEscape(folderID)) + _, err = syncthingAPIRequestWithContext(ctx, http.MethodPost, base, key, path, body, "application/json") + return err +} + func syncthingAPIRequestWithContext(ctx context.Context, method, base, key, path string, body []byte, contentType string) ([]byte, error) { var reqBody io.Reader if len(body) > 0 { diff --git a/internal/cli/syncthing_test.go b/internal/cli/syncthing_test.go index 8684957..a6cbdf9 100644 --- a/internal/cli/syncthing_test.go +++ b/internal/cli/syncthing_test.go @@ -319,15 +319,23 @@ func TestWriteRemoteSTIgnoreInPodWritesConfiguredPatterns(t *testing.T) { if rec.namespace != "default" || rec.pod != "pod-a" || rec.container != syncthingContainerName { t.Fatalf("unexpected exec target namespace=%q pod=%q container=%q", rec.namespace, rec.pod, rec.container) } + if rec.copyDest != "/workspace/.stignore.tmp" { + t.Fatalf("expected temp copy destination, got %q", rec.copyDest) + } + if rec.copyBody != "profiles/\n*.prof\n" { + t.Fatalf("unexpected copied .stignore content %q", rec.copyBody) + } for _, want := range []string{ "mkdir -p '/workspace'", - "printf %s 'profiles/\n*.prof\n'", - "> '/workspace/.stignore'", + "mv '/workspace/.stignore.tmp' '/workspace/.stignore'", } { if !strings.Contains(rec.script, want) { t.Fatalf("expected remote .stignore script to contain %q, got %q", want, rec.script) } } + if strings.Contains(rec.script, "profiles/") || strings.Contains(rec.script, "*.prof") { + t.Fatalf("expected remote payload to be streamed, script was %q", rec.script) + } } func TestWriteRemoteSTIgnoreInPodSkipsEmptyPatterns(t *testing.T) { @@ -387,6 +395,9 @@ type syncthingExecRecorder struct { pod string container string script string + copyPath string + copyDest string + copyBody string out []byte err error } @@ -399,6 +410,20 @@ func (r *syncthingExecRecorder) ExecShInContainer(_ context.Context, namespace, return r.out, r.err } +func (r *syncthingExecRecorder) CopyToPodInContainer(_ context.Context, namespace, localPath, pod, container, remotePath string) error { + r.namespace = namespace + r.pod = pod + r.container = container + r.copyPath = localPath + r.copyDest = remotePath + body, err := os.ReadFile(localPath) + if err != nil { + return err + } + r.copyBody = string(body) + return r.err +} + func TestParseLocalSyncthingAPIBaseUsesLatestLoopbackAddress(t *testing.T) { logText := strings.Join([]string{ "INFO: GUI and API listening on 127.0.0.1:49100", @@ -694,7 +719,7 @@ func TestRunTwoPhaseInitialSync(t *testing.T) { compression: false, } - err := runTwoPhaseInitialSync(ctx, &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", sc) + err := runTwoPhaseInitialSync(ctx, &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", nil, sc) if err != nil { t.Fatal(err) } @@ -817,7 +842,7 @@ func TestRunTwoPhaseInitialSyncWaitsForDeletionConvergence(t *testing.T) { var buf bytes.Buffer sc := syncthingSyncConfig{rescanInterval: 300, watcherDelay: 1} - err := runTwoPhaseInitialSync(context.Background(), &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", sc) + err := runTwoPhaseInitialSync(context.Background(), &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", nil, sc) if err != nil { t.Fatal(err) } @@ -887,7 +912,7 @@ func TestRunTwoPhaseInitialSyncWaitsForPeerConnection(t *testing.T) { var buf bytes.Buffer sc := syncthingSyncConfig{rescanInterval: 300, watcherDelay: 1} - err := runTwoPhaseInitialSync(context.Background(), &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", sc) + err := runTwoPhaseInitialSync(context.Background(), &buf, localSrv.URL, "k", "LOCAL", remoteSrv.URL, "k", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "/tmp/remote", nil, sc) if err != nil { t.Fatal(err) } @@ -1125,6 +1150,41 @@ func TestSyncthingAPIRequestWithContextRejectsErrorStatus(t *testing.T) { } } +func TestSyncthingSetIgnoresPostsPatterns(t *testing.T) { + var gotBody []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Fatalf("unexpected method %s", r.Method) + } + if r.URL.Path != "/rest/db/ignores" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + if got := r.URL.Query().Get("folder"); got != "okdev-test" { + t.Fatalf("unexpected folder query %q", got) + } + if got := r.Header.Get("X-API-Key"); got != "secret" { + t.Fatalf("unexpected API key %q", got) + } + if got := r.Header.Get("Content-Type"); got != "application/json" { + t.Fatalf("unexpected content type %q", got) + } + var err error + gotBody, err = io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + _, _ = w.Write([]byte(`{"ignore":["profiles/","*.prof"]}`)) + })) + defer srv.Close() + + if err := syncthingSetIgnores(context.Background(), srv.URL, "secret", "okdev-test", []string{"profiles/", "*.prof"}); err != nil { + t.Fatal(err) + } + if string(gotBody) != `{"ignore":["profiles/","*.prof"]}` { + t.Fatalf("unexpected request body %s", gotBody) + } +} + func TestConfigureSyncthingPeerAddsAndUpdatesConfig(t *testing.T) { cfg := map[string]any{ "devices": []any{}, @@ -1279,6 +1339,170 @@ func TestConfigureSyncthingPeerAddsAndUpdatesConfig(t *testing.T) { } } +func TestConfigureSyncthingPeerPrunesStaleManagedPeerDevices(t *testing.T) { + cfg := map[string]any{ + "devices": []any{ + map[string]any{ + "deviceID": "STALE-1", + "name": "okdev-peer", + "addresses": []any{"tcp://127.0.0.1:22001"}, + }, + map[string]any{ + "deviceID": "KEEP-MANUAL", + "name": "teammate-laptop", + "addresses": []any{"dynamic"}, + }, + map[string]any{ + "deviceID": "STALE-2", + "name": "okdev-peer", + "addresses": []any{"tcp://127.0.0.1:22002"}, + }, + }, + "folders": []any{}, + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/rest/config": + _ = json.NewEncoder(w).Encode(cfg) + case r.Method == http.MethodPut && r.URL.Path == "/rest/config": + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + if err := json.Unmarshal(body, &cfg); err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusOK) + default: + t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path) + } + })) + defer srv.Close() + + if err := configureSyncthingPeer(context.Background(), srv.URL, "k", "LOCAL", "REMOTE", "tcp://127.0.0.1:22000", "okdev-test", "/tmp/local", "sendreceive", 300, 0, false, false, false); err != nil { + t.Fatal(err) + } + + devices, err := syncthingObjectArray(cfg, "devices") + if err != nil { + t.Fatal(err) + } + if len(devices) != 2 { + t.Fatalf("expected current managed peer plus manual device, got %d devices", len(devices)) + } + + gotIDs := make([]string, 0, len(devices)) + for _, d := range devices { + m, err := syncthingObjectMap(d, "devices") + if err != nil { + t.Fatal(err) + } + gotIDs = append(gotIDs, asString(m["deviceID"])) + } + wantIDs := []string{"KEEP-MANUAL", "REMOTE"} + if strings.Join(gotIDs, ",") != strings.Join(wantIDs, ",") { + t.Fatalf("unexpected device IDs %v, want %v", gotIDs, wantIDs) + } +} + +func TestConfigureSyncthingPeerPreservesMeshReceiverFolderPeers(t *testing.T) { + cfg := map[string]any{ + "devices": []any{ + map[string]any{ + "deviceID": "LOCAL", + "name": managedSyncthingPeer, + "addresses": []any{"dynamic"}, + }, + map[string]any{ + "deviceID": "RECV", + "name": "okdev-mesh-receiver", + "addresses": []any{"tcp://10.0.0.2:22000"}, + }, + }, + "folders": []any{ + map[string]any{ + "id": "okdev-test", + "path": "/workspace", + "type": "sendreceive", + "devices": []any{ + map[string]any{"deviceID": "REMOTE"}, + map[string]any{"deviceID": "LOCAL"}, + map[string]any{"deviceID": "RECV"}, + }, + }, + }, + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/rest/config": + _ = json.NewEncoder(w).Encode(cfg) + case r.Method == http.MethodPut && r.URL.Path == "/rest/config": + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + if err := json.Unmarshal(body, &cfg); err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusOK) + default: + t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path) + } + })) + defer srv.Close() + + if err := configureSyncthingPeer(context.Background(), srv.URL, "k", "REMOTE", "LOCAL", "", "okdev-test", "/workspace", "sendreceive", 300, 1, false, false, false); err != nil { + t.Fatal(err) + } + + folders, err := syncthingObjectArray(cfg, "folders") + if err != nil { + t.Fatal(err) + } + folder, err := syncthingObjectMap(folders[0], "folders") + if err != nil { + t.Fatal(err) + } + folderDevices, ok := folder["devices"].([]any) + if !ok { + t.Fatalf("expected folder devices array, got %#v", folder["devices"]) + } + gotIDs := make([]string, 0, len(folderDevices)) + for _, d := range folderDevices { + m, err := syncthingObjectMap(d, "folder devices") + if err != nil { + t.Fatal(err) + } + gotIDs = append(gotIDs, asString(m["deviceID"])) + } + wantIDs := []string{"REMOTE", "LOCAL", "RECV"} + if strings.Join(gotIDs, ",") != strings.Join(wantIDs, ",") { + t.Fatalf("unexpected folder device IDs %v, want %v", gotIDs, wantIDs) + } +} + +func TestSyncthingBootstrapAndRuntimeContexts(t *testing.T) { + parent := context.Background() + bootstrapCtx, cancel, runtimeCtx := syncthingBootstrapAndRuntimeContexts(parent) + t.Cleanup(cancel) + + if runtimeCtx != parent { + t.Fatal("expected runtime context to be the parent command context") + } + + if _, ok := runtimeCtx.Deadline(); ok { + t.Fatal("expected runtime context to avoid the bootstrap deadline") + } + + deadline, ok := bootstrapCtx.Deadline() + if !ok { + t.Fatal("expected bootstrap context to have a deadline") + } + if remaining := time.Until(deadline); remaining <= 0 || remaining > syncthingBootstrapTimeout { + t.Fatalf("unexpected bootstrap deadline remaining %s", remaining) + } +} + func TestConfigureSyncthingMeshHubKeepsAllReceiversInFolder(t *testing.T) { cfg := map[string]any{ "devices": []any{ @@ -1408,6 +1632,88 @@ func TestWriteSTIgnoreDefaultExcludes(t *testing.T) { } } +func TestConfigureSyncthingMeshHubPreservesExistingNonReceiverFolderPeers(t *testing.T) { + cfg := map[string]any{ + "devices": []any{ + map[string]any{ + "deviceID": "HUB", + "name": "hub", + "addresses": []any{"dynamic"}, + }, + map[string]any{ + "deviceID": "LOCAL", + "name": managedSyncthingPeer, + "addresses": []any{"tcp://127.0.0.1:22000"}, + }, + }, + "folders": []any{ + map[string]any{ + "id": "okdev-test", + "path": "/workspace", + "type": "sendreceive", + "devices": []any{ + map[string]any{"deviceID": "HUB"}, + map[string]any{"deviceID": "LOCAL"}, + }, + }, + }, + "options": map[string]any{}, + } + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method + " " + r.URL.Path { + case http.MethodGet + " /rest/config": + if err := json.NewEncoder(w).Encode(cfg); err != nil { + t.Fatal(err) + } + case http.MethodPut + " /rest/config": + if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusOK) + default: + t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path) + } + })) + defer srv.Close() + + err := configureSyncthingMeshHub(context.Background(), srv.URL, "k", "HUB", map[string]string{ + "RECV2": "tcp://10.0.0.2:22000", + "RECV1": "tcp://10.0.0.1:22000", + }, "okdev-test", "/workspace") + if err != nil { + t.Fatal(err) + } + + folders, err := syncthingObjectArray(cfg, "folders") + if err != nil { + t.Fatal(err) + } + if len(folders) != 1 { + t.Fatalf("expected 1 folder, got %d", len(folders)) + } + folder, err := syncthingObjectMap(folders[0], "folders") + if err != nil { + t.Fatal(err) + } + folderDevices, ok := folder["devices"].([]any) + if !ok { + t.Fatalf("expected folder devices array, got %#v", folder["devices"]) + } + gotIDs := make([]string, 0, len(folderDevices)) + for _, d := range folderDevices { + m, err := syncthingObjectMap(d, "folder devices") + if err != nil { + t.Fatal(err) + } + gotIDs = append(gotIDs, asString(m["deviceID"])) + } + wantIDs := []string{"HUB", "LOCAL", "RECV1", "RECV2"} + if strings.Join(gotIDs, ",") != strings.Join(wantIDs, ",") { + t.Fatalf("unexpected folder device IDs %v, want %v", gotIDs, wantIDs) + } +} + func TestWriteLocalSTIgnorePropagatesStatError(t *testing.T) { base := t.TempDir() filePath := filepath.Join(base, "not-a-dir") From decb398c9591cb9322440bc50661aa28f9519107 Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 16:24:07 +0800 Subject: [PATCH 2/6] fix(list): default to all namespaces for owner --- internal/cli/list.go | 10 +++-- internal/cli/status_list_test.go | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/internal/cli/list.go b/internal/cli/list.go index 6dcb087..46933a6 100644 --- a/internal/cli/list.go +++ b/internal/cli/list.go @@ -18,11 +18,11 @@ func newListCmd(opts *Options) *cobra.Command { cmd := &cobra.Command{ Use: "list", Short: "List dev sessions", - Example: ` # List your sessions in the current namespace + Example: ` # List your sessions across all namespaces okdev list - # List across all namespaces - okdev list --all-namespaces + # Narrow to a specific namespace + okdev list --namespace proj-tango # List all users' sessions okdev list --all-users @@ -31,6 +31,8 @@ func newListCmd(opts *Options) *cobra.Command { okdev list --output json`, RunE: func(cmd *cobra.Command, args []string) error { cc := &commandContext{opts: opts} + explicitNamespace := strings.TrimSpace(opts.Namespace) != "" + effectiveAllNamespaces := allNamespaces || !explicitNamespace ns := opts.Namespace activeSession, activeErr := session.LoadActiveSession() if activeErr != nil { @@ -57,7 +59,7 @@ func newListCmd(opts *Options) *cobra.Command { if !allUsers { label = label + "," + ownerLabelSelector(opts) } - pods, err := cc.kube.ListPods(ctx, cc.namespace, allNamespaces, label) + pods, err := cc.kube.ListPods(ctx, cc.namespace, effectiveAllNamespaces, label) if err != nil { return err } diff --git a/internal/cli/status_list_test.go b/internal/cli/status_list_test.go index a12317f..bb0bb5c 100644 --- a/internal/cli/status_list_test.go +++ b/internal/cli/status_list_test.go @@ -80,6 +80,75 @@ func TestNewListCmdOutputsJSON(t *testing.T) { } } +func TestNewListCmdDefaultsToAllNamespacesForCurrentOwner(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/api/v1/pods": + _, _ = io.WriteString(w, `{"kind":"PodList","apiVersion":"v1","items":[{"metadata":{"namespace":"proj-tango","name":"okdev-sess-a","creationTimestamp":"2026-03-29T00:00:00Z","labels":{"okdev.io/session":"sess-a","okdev.io/owner":"alice","okdev.io/workload-type":"pod"}},"status":{"phase":"Running","containerStatuses":[{"name":"dev","ready":true,"restartCount":1}]}}]}`) + case "/api/v1/namespaces/default/pods": + _, _ = io.WriteString(w, `{"kind":"PodList","apiVersion":"v1","items":[]}`) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + t.Setenv("KUBECONFIG", writeCLITLSTestKubeconfig(t, server)) + cfgPath := writeCLIConfig(t, "default") + opts := &Options{ConfigPath: cfgPath, Context: "dev", Output: "json", Owner: "alice"} + cmd := newListCmd(opts) + var out bytes.Buffer + cmd.SetOut(&out) + cmd.SetErr(io.Discard) + + if err := cmd.Execute(); err != nil { + t.Fatalf("list execute: %v", err) + } + + var rows []map[string]any + if err := json.Unmarshal(out.Bytes(), &rows); err != nil { + t.Fatalf("json unmarshal: %v\n%s", err, out.String()) + } + if len(rows) != 1 || rows[0]["session"] != "sess-a" || rows[0]["namespace"] != "proj-tango" { + t.Fatalf("unexpected list rows: %#v", rows) + } +} + +func TestNewListCmdNamespaceOverrideNarrowsResults(t *testing.T) { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.URL.Path { + case "/api/v1/namespaces/demo/pods": + _, _ = io.WriteString(w, `{"kind":"PodList","apiVersion":"v1","items":[{"metadata":{"namespace":"demo","name":"okdev-sess-a","creationTimestamp":"2026-03-29T00:00:00Z","labels":{"okdev.io/session":"sess-a","okdev.io/owner":"alice","okdev.io/workload-type":"pod"}},"status":{"phase":"Running","containerStatuses":[{"name":"dev","ready":true,"restartCount":1}]}}]}`) + case "/api/v1/pods": + _, _ = io.WriteString(w, `{"kind":"PodList","apiVersion":"v1","items":[{"metadata":{"namespace":"proj-tango","name":"okdev-sess-b","creationTimestamp":"2026-03-29T00:00:00Z","labels":{"okdev.io/session":"sess-b","okdev.io/owner":"alice","okdev.io/workload-type":"pod"}},"status":{"phase":"Running","containerStatuses":[{"name":"dev","ready":true,"restartCount":1}]}}]}`) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + t.Setenv("KUBECONFIG", writeCLITLSTestKubeconfig(t, server)) + opts := &Options{Namespace: "demo", Context: "dev", Output: "json", Owner: "alice"} + cmd := newListCmd(opts) + var out bytes.Buffer + cmd.SetOut(&out) + cmd.SetErr(io.Discard) + + if err := cmd.Execute(); err != nil { + t.Fatalf("list execute: %v", err) + } + + var rows []map[string]any + if err := json.Unmarshal(out.Bytes(), &rows); err != nil { + t.Fatalf("json unmarshal: %v\n%s", err, out.String()) + } + if len(rows) != 1 || rows[0]["session"] != "sess-a" || rows[0]["namespace"] != "demo" { + t.Fatalf("unexpected list rows: %#v", rows) + } +} + func TestNewStatusCmdDetailsRequiresSingleSession(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") From bd7e3f300cda4977798a8e8138899e03ef244105 Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 17:17:32 +0800 Subject: [PATCH 3/6] test: cover pytorchjob pvc subpath workspace sync --- scripts/e2e_kind_pytorchjob.sh | 184 ++++++++++++++++++++++++++++++++- 1 file changed, 183 insertions(+), 1 deletion(-) diff --git a/scripts/e2e_kind_pytorchjob.sh b/scripts/e2e_kind_pytorchjob.sh index f436bfd..17e4593 100755 --- a/scripts/e2e_kind_pytorchjob.sh +++ b/scripts/e2e_kind_pytorchjob.sh @@ -8,6 +8,9 @@ SIDECAR_IMAGE="${SIDECAR_IMAGE:-okdev-sidecar:v0.0.0-e2e}" SESSION_NAME="${SESSION_NAME:-e2e-ptjob}" NAMESPACE="${NAMESPACE:-default}" PVC_NAME="${PVC_NAME:-${SESSION_NAME}-workspace}" +PVC_WORKSPACE_SUBPATH="${PVC_WORKSPACE_SUBPATH:-users/e2e/workspace}" +SECONDARY_PVC_MOUNT="${SECONDARY_PVC_MOUNT:-/proj-tango-pvc}" +PVC_OUTSIDE_WORKSPACE_DIR="${PVC_OUTSIDE_WORKSPACE_DIR:-shared-outside-workspace}" WORKDIR="$(make_workdir)" HOME_DIR="${HOME_DIR:-$WORKDIR/home}" CFG_PATH="$WORKDIR/.okdev/okdev.yaml" @@ -15,6 +18,7 @@ SYNC_DIR="$WORKDIR/workspace" ORIG_HOME="${HOME}" ORIG_KUBECONFIG="${KUBECONFIG:-}" KUBECONFIG_PATH="$HOME_DIR/.kube/config" +PVC_INIT_POD="${SESSION_NAME}-pvc-init" mkdir -p "$HOME_DIR" "$SYNC_DIR" "$HOME_DIR/.kube" if [[ -n "$ORIG_KUBECONFIG" ]]; then @@ -37,6 +41,7 @@ cleanup() { cat "$HOME_DIR/.okdev/sessions/${SESSION_NAME}/syncthing/local.log" 2>/dev/null || true fi "$OKDEV_BIN" --config "$CFG_PATH" --session "$SESSION_NAME" down --yes >/dev/null 2>&1 || true + kubectl -n "$NAMESPACE" delete pod "$PVC_INIT_POD" >/dev/null 2>&1 || true kubectl -n "$NAMESPACE" delete pvc "$PVC_NAME" >/dev/null 2>&1 || true return "$status" } @@ -53,6 +58,26 @@ refresh_pytorchjob_pods() { -o jsonpath='{.items[*].metadata.name}') } +wait_for_pod_file_content() { + local pod="$1" + local path="$2" + local expected="$3" + local label="$4" + local content="" + + for attempt in $(seq 1 30); do + content=$(kubectl -n "$NAMESPACE" exec "$pod" -c pytorch -- sh -lc "cat '$path' 2>/dev/null || true") + if [[ "$content" == "$expected" ]]; then + echo "$label verified on attempt $attempt" + return 0 + fi + sleep 2 + done + + echo "ERROR: expected $label on pod $pod at $path, got '$content'" >&2 + return 1 +} + echo "Scaffolding PyTorchJob config via okdev init" cd "$WORKDIR" "$OKDEV_BIN" init \ @@ -71,7 +96,37 @@ MANIFEST_PATH="$WORKDIR/.okdev/pytorchjob.yaml" replace_all_in_file "$MANIFEST_PATH" 'name: dev' 'name: pytorch' replace_all_in_file "$MANIFEST_PATH" 'image: # TODO: replace with your image' 'image: ubuntu:22.04' replace_all_in_file "$MANIFEST_PATH" 'command: ["sleep", "infinity"]' 'command: ["sh", "-lc", "trap : TERM INT; while true; do sleep 3600; done"]' -perl -0pi -e 's/- name: workspace\n emptyDir: \{\}/- name: workspace\n persistentVolumeClaim:\n claimName: '"$PVC_NAME"'/g' "$MANIFEST_PATH" +python3 - <<'PY' "$MANIFEST_PATH" "$PVC_NAME" "$PVC_WORKSPACE_SUBPATH" "$SECONDARY_PVC_MOUNT" +import pathlib, sys + +path = pathlib.Path(sys.argv[1]) +pvc_name = sys.argv[2] +workspace_subpath = sys.argv[3] +secondary_mount = sys.argv[4] +text = path.read_text() + +old_mount = """- name: workspace + mountPath: /workspace""" +new_mount = f"""- name: workspace + mountPath: /workspace + subPath: {workspace_subpath} + - name: workspace + mountPath: {secondary_mount}""" +if old_mount not in text: + raise SystemExit("workspace mount not found") +text = text.replace(old_mount, new_mount) + +old_volume = """- name: workspace + emptyDir: {}""" +new_volume = f"""- name: workspace + persistentVolumeClaim: + claimName: {pvc_name}""" +if old_volume not in text: + raise SystemExit("workspace volume not found") +text = text.replace(old_volume, new_volume) + +path.write_text(text) +PY replace_all_in_file "$CFG_PATH" 'container: dev' 'container: pytorch' python3 - <<'PY' "$CFG_PATH" import pathlib, sys @@ -134,6 +189,51 @@ spec: storage: 1Gi EOF +echo "Preparing PVC subdirectories for /workspace subPath coverage" +kubectl -n "$NAMESPACE" delete pod "$PVC_INIT_POD" --ignore-not-found >/dev/null 2>&1 || true +kubectl -n "$NAMESPACE" apply -f - </dev/null || true) + if [[ "$PVC_INIT_PHASE" == "Succeeded" ]]; then + break + fi + if [[ "$PVC_INIT_PHASE" == "Failed" ]]; then + kubectl -n "$NAMESPACE" logs "$PVC_INIT_POD" >&2 || true + echo "ERROR: failed to initialize PVC subdirectories" >&2 + exit 1 + fi + sleep 1 +done +if [[ "$PVC_INIT_PHASE" != "Succeeded" ]]; then + kubectl -n "$NAMESPACE" logs "$PVC_INIT_POD" >&2 || true + echo "ERROR: timed out initializing PVC subdirectories" >&2 + exit 1 +fi +kubectl -n "$NAMESPACE" delete pod "$PVC_INIT_POD" >/dev/null 2>&1 || true + echo "Starting PyTorchJob smoke session (1 master + 2 workers)" "$OKDEV_BIN" --config "$CFG_PATH" --session "$SESSION_NAME" up --wait-timeout 5m @@ -172,6 +272,58 @@ if [[ "$SYNC_OK" != "true" ]]; then exit 1 fi +echo "Waiting for PyTorchJob pods before validating PVC mount layout" +PODS_READY=false +for i in $(seq 1 30); do + refresh_pytorchjob_pods + if [[ -n "${MASTER_POD:-}" ]] && [[ "$(echo "$WORKER_PODS" | wc -w | tr -d ' ')" -eq 2 ]]; then + PODS_READY=true + break + fi + sleep 2 +done +if [[ "$PODS_READY" != "true" ]]; then + echo "ERROR: expected 1 master pod and 2 worker pods before PVC mount verification" >&2 + exit 1 +fi + +echo "Verifying PVC subPath is visible through the full PVC mount on master" +wait_for_pod_file_content \ + "$MASTER_POD" \ + "$SECONDARY_PVC_MOUNT/$PVC_WORKSPACE_SUBPATH/hello.txt" \ + "hello from pytorchjob e2e" \ + "secondary PVC mount on master" + +echo "Verifying workers see the synced workspace through both mount paths" +for WPOD in $WORKER_PODS; do + wait_for_pod_file_content \ + "$WPOD" \ + "/workspace/hello.txt" \ + "hello from pytorchjob e2e" \ + "workspace mount on worker $WPOD" + wait_for_pod_file_content \ + "$WPOD" \ + "$SECONDARY_PVC_MOUNT/$PVC_WORKSPACE_SUBPATH/hello.txt" \ + "hello from pytorchjob e2e" \ + "secondary PVC mount on worker $WPOD" +done + +echo "Writing through the full PVC mount and verifying the /workspace subPath view" +kubectl -n "$NAMESPACE" exec "$MASTER_POD" -c pytorch -- sh -lc \ + "echo shared-through-secondary > '$SECONDARY_PVC_MOUNT/$PVC_WORKSPACE_SUBPATH/from-secondary.txt'" +wait_for_pod_file_content \ + "$MASTER_POD" \ + "/workspace/from-secondary.txt" \ + "shared-through-secondary" \ + "master workspace view of secondary mount write" +for WPOD in $WORKER_PODS; do + wait_for_pod_file_content \ + "$WPOD" \ + "/workspace/from-secondary.txt" \ + "shared-through-secondary" \ + "worker workspace view of secondary mount write on $WPOD" +done + echo "Verifying SSH into master pod" "$OKDEV_BIN" --config "$CFG_PATH" --session "$SESSION_NAME" ssh --setup-key --cmd 'echo pytorchjob-ssh-ok' @@ -461,6 +613,22 @@ for WPOD in $WORKER_PODS; do echo "preStop handler verified on $WPOD" done +echo "Creating sibling PVC data outside the synced workspace subPath" +kubectl -n "$NAMESPACE" exec "$MASTER_POD" -c pytorch -- sh -lc \ + "mkdir -p '$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR' && echo keep > '$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR/keep.txt'" +wait_for_pod_file_content \ + "$MASTER_POD" \ + "$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR/keep.txt" \ + "keep" \ + "master sibling PVC data" +for WPOD in $WORKER_PODS; do + wait_for_pod_file_content \ + "$WPOD" \ + "$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR/keep.txt" \ + "keep" \ + "worker sibling PVC data on $WPOD" +done + echo "Creating remote-only stale file before workspace reset" kubectl -n "$NAMESPACE" exec "$MASTER_POD" -c pytorch -- sh -lc 'mkdir -p /workspace/generated && echo stale > /workspace/generated/stale.txt' for WPOD in $WORKER_PODS; do @@ -506,6 +674,20 @@ for WPOD in $WORKER_PODS; do done echo "Workspace reset verified across all pods" +echo "Verifying reset-remote preserved sibling PVC data outside the synced subPath" +wait_for_pod_file_content \ + "$MASTER_POD" \ + "$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR/keep.txt" \ + "keep" \ + "master sibling PVC data after reset" +for WPOD in $WORKER_PODS; do + wait_for_pod_file_content \ + "$WPOD" \ + "$SECONDARY_PVC_MOUNT/$PVC_OUTSIDE_WORKSPACE_DIR/keep.txt" \ + "keep" \ + "worker sibling PVC data after reset on $WPOD" +done + echo "Verifying sync remains active after reset" POST_RESET_STATUS="" for i in $(seq 1 15); do From 4d622e57dce526dfd87b6b2d259bd2cdb626fbb6 Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 17:26:16 +0800 Subject: [PATCH 4/6] test: handle syncthing ignores in two-phase tests --- internal/cli/syncthing_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/cli/syncthing_test.go b/internal/cli/syncthing_test.go index a6cbdf9..acf10e4 100644 --- a/internal/cli/syncthing_test.go +++ b/internal/cli/syncthing_test.go @@ -653,6 +653,7 @@ func TestRunTwoPhaseInitialSync(t *testing.T) { var ( mu sync.Mutex configPuts int + ignoreCalls int overrideCalls int restartCalls int needBytesValue int64 = 1024 // starts non-zero, switches to 0 after override @@ -687,6 +688,9 @@ func TestRunTwoPhaseInitialSync(t *testing.T) { w.Write([]byte(`{"myID":"DEVICE-ID"}`)) case r.Method == http.MethodGet && r.URL.Path == "/rest/system/connections": w.Write([]byte(`{"connections":{"REMOTE":{"connected":true},"LOCAL":{"connected":true}}}`)) + case r.Method == http.MethodPost && r.URL.Path == "/rest/db/ignores": + ignoreCalls++ + w.WriteHeader(http.StatusOK) case r.Method == http.MethodPost && strings.HasPrefix(r.URL.Path, "/rest/db/override"): overrideCalls++ // Simulate: after override, remote becomes in sync. @@ -728,6 +732,9 @@ func TestRunTwoPhaseInitialSync(t *testing.T) { if overrideCalls < 2 { t.Fatalf("expected /rest/db/override to be called at least twice (phase 1 + 1b), got %d", overrideCalls) } + if ignoreCalls < 2 { + t.Fatalf("expected /rest/db/ignores to be called at least twice (phase 1 + phase 2), got %d", ignoreCalls) + } // Verify neither instance reported restart-required in phase 2. if restartCalls != 0 { @@ -811,6 +818,8 @@ func TestRunTwoPhaseInitialSyncWaitsForDeletionConvergence(t *testing.T) { w.Write([]byte(`{"myID":"DEVICE-ID"}`)) case r.Method == http.MethodGet && r.URL.Path == "/rest/system/connections": w.Write([]byte(`{"connections":{"REMOTE":{"connected":true},"LOCAL":{"connected":true}}}`)) + case r.Method == http.MethodPost && r.URL.Path == "/rest/db/ignores": + w.WriteHeader(http.StatusOK) case r.Method == http.MethodPost && strings.HasPrefix(r.URL.Path, "/rest/db/override"): overrideCalls++ w.WriteHeader(http.StatusOK) @@ -890,6 +899,8 @@ func TestRunTwoPhaseInitialSyncWaitsForPeerConnection(t *testing.T) { connectionPolls++ connected := connectionPolls >= connectionReadyAt fmt.Fprintf(w, `{"connections":{"REMOTE":{"connected":%t},"LOCAL":{"connected":%t}}}`, connected, connected) + case r.Method == http.MethodPost && r.URL.Path == "/rest/db/ignores": + w.WriteHeader(http.StatusOK) case r.Method == http.MethodPost && strings.HasPrefix(r.URL.Path, "/rest/db/override"): overrideCalls++ w.WriteHeader(http.StatusOK) From e3b117ed28b56b9d5d7d60a05d455d8729fdb79f Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 17:35:43 +0800 Subject: [PATCH 5/6] test: stabilize pytorchjob sync verification --- scripts/e2e_kind_pytorchjob.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/e2e_kind_pytorchjob.sh b/scripts/e2e_kind_pytorchjob.sh index 17e4593..91cb329 100755 --- a/scripts/e2e_kind_pytorchjob.sh +++ b/scripts/e2e_kind_pytorchjob.sh @@ -173,8 +173,6 @@ cat "$CFG_PATH" echo "Generated manifest:" cat "$MANIFEST_PATH" -echo "hello from pytorchjob e2e" >"$SYNC_DIR/hello.txt" - echo "Creating shared workspace PVC" kubectl -n "$NAMESPACE" apply -f - <"$SYNC_DIR/hello.txt" + echo "Waiting for synced file to appear remotely with correct content" SYNC_OK=false for i in $(seq 1 30); do From 56689ec81a021b6759b024e7e6e44f635e08e3da Mon Sep 17 00:00:00 2001 From: acmore Date: Mon, 13 Apr 2026 17:45:31 +0800 Subject: [PATCH 6/6] fix(workload): mirror workspace subpath on sidecar --- internal/kube/podspec.go | 21 ++++++++++++++++----- internal/kube/podspec_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/internal/kube/podspec.go b/internal/kube/podspec.go index 57eef3a..779bc2b 100644 --- a/internal/kube/podspec.go +++ b/internal/kube/podspec.go @@ -57,11 +57,13 @@ func PreparePodSpecForTarget(podSpec corev1.PodSpec, volumes []corev1.Volume, wo }) targetIndex := targetContainerIndex(spec.Containers, targetContainer) + workspaceMount := corev1.VolumeMount{ + Name: "workspace", + MountPath: workspaceMountPath, + } if targetIndex >= 0 { - spec.Containers[targetIndex].VolumeMounts = ensureVolumeMount(spec.Containers[targetIndex].VolumeMounts, corev1.VolumeMount{ - Name: "workspace", - MountPath: workspaceMountPath, - }) + spec.Containers[targetIndex].VolumeMounts = ensureVolumeMount(spec.Containers[targetIndex].VolumeMounts, workspaceMount) + workspaceMount = workspaceMountForSidecar(spec.Containers[targetIndex].VolumeMounts, workspaceMount) spec.Containers[targetIndex].VolumeMounts = ensureVolumeMount(spec.Containers[targetIndex].VolumeMounts, corev1.VolumeMount{ Name: "okdev-runtime", MountPath: "/var/okdev", @@ -99,7 +101,7 @@ func PreparePodSpecForTarget(podSpec corev1.PodSpec, volumes []corev1.Volume, wo {ContainerPort: 22000, Name: "st-sync"}, }, VolumeMounts: []corev1.VolumeMount{ - {Name: "workspace", MountPath: workspaceMountPath}, + workspaceMount, {Name: "syncthing-home", MountPath: "/var/syncthing"}, {Name: "okdev-runtime", MountPath: "/var/okdev"}, }, @@ -188,6 +190,15 @@ func ensureVolumeMount(mounts []corev1.VolumeMount, vm corev1.VolumeMount) []cor return append(mounts, vm) } +func workspaceMountForSidecar(mounts []corev1.VolumeMount, fallback corev1.VolumeMount) corev1.VolumeMount { + for _, mount := range mounts { + if mount.Name == fallback.Name && mount.MountPath == fallback.MountPath { + return mount + } + } + return fallback +} + func ensureEnvVar(envs []corev1.EnvVar, env corev1.EnvVar) []corev1.EnvVar { for i := range envs { if envs[i].Name == env.Name { diff --git a/internal/kube/podspec_test.go b/internal/kube/podspec_test.go index 8f95d61..46fb4ff 100644 --- a/internal/kube/podspec_test.go +++ b/internal/kube/podspec_test.go @@ -95,6 +95,40 @@ func TestPreparePodSpecAddsWorkspaceMountOnDevAndSidecar(t *testing.T) { } } +func TestPreparePodSpecMirrorsTargetWorkspaceSubPathOnSidecar(t *testing.T) { + spec, err := PreparePodSpecForTarget(corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: "ubuntu", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "workspace", + MountPath: "/workspace", + SubPath: "users/e2e/workspace", + }, + }, + }, + }, + }, nil, "/workspace", "ghcr.io/acmore/okdev-sidecar:edge", corev1.ResourceRequirements{}, false, "", "pytorch") + if err != nil { + t.Fatal(err) + } + sidecar := findContainer(spec.Containers, "okdev-sidecar") + if sidecar == nil { + t.Fatal("sidecar container not found") + } + for _, mount := range sidecar.VolumeMounts { + if mount.Name == "workspace" { + if mount.MountPath != "/workspace" || mount.SubPath != "users/e2e/workspace" { + t.Fatalf("expected sidecar workspace mount to mirror target subPath, got %+v", mount) + } + return + } + } + t.Fatalf("expected sidecar workspace mount, got %+v", sidecar.VolumeMounts) +} + func TestPreparePodSpecErrorsOnEmptyImage(t *testing.T) { if _, err := PreparePodSpec(corev1.PodSpec{}, nil, "/workspace", "", corev1.ResourceRequirements{}, false, ""); err == nil { t.Fatal("expected error for empty sidecar image")