diff --git a/cmd/vm/debug.go b/cmd/vm/debug.go index 8f00caa..eea6992 100644 --- a/cmd/vm/debug.go +++ b/cmd/vm/debug.go @@ -22,9 +22,7 @@ type chDebugSpec struct { CowPath string CHBin string MaxCPU int - Memory int Balloon int - CowSize int DirectBoot bool } @@ -151,8 +149,6 @@ func buildCHDebugSpec(cmd *cobra.Command, storageConfigs []*types.StorageConfig, balloon, _ := cmd.Flags().GetInt("balloon") cowPath, _ := cmd.Flags().GetString("cow") chBin, _ := cmd.Flags().GetString("ch") - memoryMB := int(vmCfg.Memory >> 20) //nolint:mnd - cowSizeGB := int(vmCfg.Storage >> 30) //nolint:mnd // Mirror runtime gating: Windows / sub-MinBalloon VMs never get balloon, // even if the user passed --balloon, so debug output stays truthful. size, ok := hypervisor.BalloonSize(vmCfg.Memory, vmCfg.Windows) @@ -169,9 +165,7 @@ func buildCHDebugSpec(cmd *cobra.Command, storageConfigs []*types.StorageConfig, CowPath: cowPath, CHBin: chBin, MaxCPU: maxCPU, - Memory: memoryMB, Balloon: balloon, - CowSize: cowSizeGB, DirectBoot: boot.KernelPath != "", } } @@ -195,7 +189,7 @@ func printCHDebug(s chDebugSpec) { cocoonLayers, hypervisor.CowSerial) fmt.Println("# Prepare COW disk") - fmt.Printf("truncate -s %dG %s\n", s.CowSize, s.CowPath) + fmt.Printf("truncate -s %dG %s\n", s.VMCfg.Storage>>30, s.CowPath) //nolint:mnd fmt.Printf("mkfs.ext4 -F -m 0 -q -E lazy_itable_init=1,lazy_journal_init=1,discard %s\n", s.CowPath) fmt.Println() fmt.Printf("# Launch VM: %s (image: %s, boot: direct kernel)\n", s.VMCfg.Name, s.VMCfg.Image) @@ -215,8 +209,8 @@ func printCHDebug(s chDebugSpec) { basePath := s.Configs[0].Path fmt.Println("# Prepare COW overlay") fmt.Printf("qemu-img create -f qcow2 -F qcow2 -b %s %s\n", basePath, s.CowPath) - if s.CowSize > 0 { - fmt.Printf("qemu-img resize %s %dG\n", s.CowPath, s.CowSize) + if s.VMCfg.Storage > 0 { + fmt.Printf("qemu-img resize %s %dG\n", s.CowPath, s.VMCfg.Storage>>30) //nolint:mnd } fmt.Println() fmt.Printf("# Launch VM: %s (image: %s, boot: UEFI firmware)\n", s.VMCfg.Name, s.VMCfg.Image) @@ -239,7 +233,7 @@ func printCommonCHArgs(s chDebugSpec) { memExtra = ",shared=on" } fmt.Printf(" --cpus boot=%d,max=%d%s \\\n", s.VMCfg.CPU, s.MaxCPU, cpuExtra) - fmt.Printf(" --memory size=%dM%s \\\n", s.Memory, memExtra) + fmt.Printf(" --memory size=%dM%s \\\n", s.VMCfg.Memory>>20, memExtra) //nolint:mnd fmt.Printf(" --rng src=/dev/urandom \\\n") if s.Balloon > 0 { fmt.Printf(" --balloon size=%dM,deflate_on_oom=on,free_page_reporting=on \\\n", s.Balloon) diff --git a/cmd/vm/lifecycle.go b/cmd/vm/lifecycle.go index c18ef76..f1a52bc 100644 --- a/cmd/vm/lifecycle.go +++ b/cmd/vm/lifecycle.go @@ -24,8 +24,13 @@ import ( "github.com/cocoonstack/cocoon/utils" ) -// logHeadSigLen spans CH/FC's boot timestamp on line 1. -const logHeadSigLen = 64 +const ( + // logHeadSigLen spans CH/FC's boot timestamp on line 1. + logHeadSigLen = 64 + // logFollowDebounce coalesces fsnotify events on the log file before + // the catch-up io.Copy fires. + logFollowDebounce = 100 * time.Millisecond +) // attachedDevices is the inspect-only view of runtime hot-plugged devices. // Cocoon never persists this structure; it is read from CH vm.info on demand. @@ -217,7 +222,7 @@ func streamLog(ctx context.Context, path string, follow bool, tail int) error { return nil } - events, err := utils.WatchFile(ctx, path, 100*time.Millisecond) //nolint:mnd + events, err := utils.WatchFile(ctx, path, logFollowDebounce) if err != nil { return fmt.Errorf("watch log: %w", err) } diff --git a/cmd/vm/status.go b/cmd/vm/status.go index 86bf2ec..9c93ac1 100644 --- a/cmd/vm/status.go +++ b/cmd/vm/status.go @@ -21,6 +21,10 @@ import ( "github.com/cocoonstack/cocoon/utils" ) +// statusWatchDebounce coalesces fsnotify events on the per-backend index file +// during `vm status` polling. +const statusWatchDebounce = 200 * time.Millisecond + type vmEvent struct { Event string `json:"event"` VM types.VM `json:"vm"` @@ -32,6 +36,12 @@ type vmSnapshot struct { memory int64 } +type eventEmitter struct { + begin func() + emit func(event string, snap vmSnapshot, vm types.VM) + end func() +} + func (h Handler) List(cmd *cobra.Command, _ []string) error { ctx, conf, err := h.Init(cmd) if err != nil { @@ -102,7 +112,7 @@ func mergeWatchChannels(ctx context.Context, hypers []hypervisor.Hypervisor) <-c if !ok { continue } - ch, err := utils.WatchFile(ctx, w.WatchPath(), 200*time.Millisecond) //nolint:mnd + ch, err := utils.WatchFile(ctx, w.WatchPath(), statusWatchDebounce) if err == nil { channels = append(channels, ch) } @@ -179,72 +189,70 @@ func statusRefreshLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filt func statusEventLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time) { fmt.Println("EVENT\tID\tNAME\tSTATE\tCPU\tMEMORY\tIP\tIMAGE") //nolint:errcheck - prev := map[string]vmSnapshot{} - runLoop(ctx, watchCh, tick, func() { - vms := listAndFilter(ctx, hypers, filters) - curr := make(map[string]vmSnapshot, len(vms)) - for _, vm := range vms { - curr[vm.ID] = takeSnapshot(vm) - } - - w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) - for id, snap := range curr { - old, existed := prev[id] - switch { - case !existed: - printEventRow(w, "ADDED", snap) - case old != snap: - printEventRow(w, "MODIFIED", snap) - } - } - for id, snap := range prev { - if _, exists := curr[id]; !exists { - printEventRow(w, "DELETED", snap) - } - } - _ = w.Flush() - prev = curr + var w *tabwriter.Writer + statusEventDiffLoop(ctx, hypers, filters, watchCh, tick, eventEmitter{ + begin: func() { w = tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) }, + emit: func(event string, snap vmSnapshot, _ types.VM) { printEventRow(w, event, snap) }, + end: func() { _ = w.Flush() }, }) } func statusEventLoopJSON(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time) { enc := json.NewEncoder(os.Stdout) - type snapshotWithVM struct { + statusEventDiffLoop(ctx, hypers, filters, watchCh, tick, eventEmitter{ + emit: func(event string, _ vmSnapshot, vm types.VM) { + _ = enc.Encode(vmEvent{Event: event, VM: vm}) + }, + }) +} + +// statusEventDiffLoop runs the shared diff: snapshot all VMs each tick, compare +// against the previous tick's snapshots, and emit ADDED/MODIFIED/DELETED events +// via emitter. Holds both snap and vm so emitters can choose either format. +func statusEventDiffLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time, emitter eventEmitter) { + type entry struct { snap vmSnapshot vm types.VM } - prev := map[string]snapshotWithVM{} + prev := map[string]entry{} runLoop(ctx, watchCh, tick, func() { vms := listAndFilter(ctx, hypers, filters) - curr := make(map[string]snapshotWithVM, len(vms)) + curr := make(map[string]entry, len(vms)) for _, vm := range vms { - vm.State = types.VMState(cmdcore.ReconcileState(vm)) - curr[vm.ID] = snapshotWithVM{snap: takeSnapshot(vm), vm: *vm} + state := cmdcore.ReconcileState(vm) + vmCopy := *vm + vmCopy.State = types.VMState(state) + curr[vm.ID] = entry{snap: takeSnapshot(vm, state), vm: vmCopy} } - - for id, entry := range curr { + if emitter.begin != nil { + emitter.begin() + } + for id, e := range curr { old, existed := prev[id] switch { case !existed: - _ = enc.Encode(vmEvent{Event: "ADDED", VM: entry.vm}) - case old.snap != entry.snap: - _ = enc.Encode(vmEvent{Event: "MODIFIED", VM: entry.vm}) + emitter.emit("ADDED", e.snap, e.vm) + case old.snap != e.snap: + emitter.emit("MODIFIED", e.snap, e.vm) } } - for id, entry := range prev { + for id, e := range prev { if _, exists := curr[id]; !exists { - _ = enc.Encode(vmEvent{Event: "DELETED", VM: entry.vm}) + emitter.emit("DELETED", e.snap, e.vm) } } + if emitter.end != nil { + emitter.end() + } prev = curr }) } -func takeSnapshot(vm *types.VM) vmSnapshot { +func takeSnapshot(vm *types.VM, state string) vmSnapshot { return vmSnapshot{ id: vm.ID, name: vm.Config.Name, - state: cmdcore.ReconcileState(vm), + state: state, cpu: vm.Config.CPU, memory: vm.Config.Memory, ip: vmIPs(vm), @@ -293,7 +301,7 @@ func matchesFilter(vm *types.VM, filters []string) bool { func snapshotAll(vms []*types.VM) []vmSnapshot { result := make([]vmSnapshot, len(vms)) for i, vm := range vms { - result[i] = takeSnapshot(vm) + result[i] = takeSnapshot(vm, cmdcore.ReconcileState(vm)) } return result } diff --git a/cmd/vm/status_test.go b/cmd/vm/status_test.go index fe781c8..f6a5a6a 100644 --- a/cmd/vm/status_test.go +++ b/cmd/vm/status_test.go @@ -78,7 +78,7 @@ func TestVMIPsAndSort(t *testing.T) { t.Fatalf("vmIPs() = %q, want %q", got, "10.0.0.2,10.0.0.3") } - snap := takeSnapshot(vms[0]) + snap := takeSnapshot(vms[0], "running") if snap.id != "1" || snap.name != "earlier" || snap.image != "img-a" { t.Fatalf("takeSnapshot() = %+v", snap) } diff --git a/hypervisor/cloudhypervisor/helper.go b/hypervisor/cloudhypervisor/helper.go index 7c1fb4f..0a8ba36 100644 --- a/hypervisor/cloudhypervisor/helper.go +++ b/hypervisor/cloudhypervisor/helper.go @@ -101,6 +101,17 @@ func vmAPIOnce(ctx context.Context, hc *http.Client, endpoint string, body []byt return utils.DoAPIOnce(ctx, hc, http.MethodPut, "http://localhost/api/v1/"+endpoint, body, successCodes...) } +// vmPutJSON marshals payload and PUTs to endpoint via vmAPIOnce. Mirrors +// firecracker.putJSON so per-endpoint helpers stay one-line wrappers. +func vmPutJSON[T any](ctx context.Context, hc *http.Client, endpoint, kind string, payload T, successCodes ...int) error { + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal %s: %w", kind, err) + } + _, err = vmAPIOnce(ctx, hc, endpoint, body, successCodes...) + return err +} + // shutdownVM, pauseVM, resumeVM are all CH state transitions; a second call // in the wrong state returns an error. Route through vmAPIOnce so a retry // after a lost response cannot mask the original success. @@ -158,34 +169,18 @@ func restoreVM(ctx context.Context, hc *http.Client, sourceDir string, onDemand // masks the original success. Used during clone (cidata hot-plug, NIC swap) // after vm.restore. func addDiskVM(ctx context.Context, hc *http.Client, disk chDisk) error { - body, err := json.Marshal(disk) - if err != nil { - return fmt.Errorf("marshal add-disk request: %w", err) - } - _, err = vmAPIOnce(ctx, hc, "vm.add-disk", body, http.StatusOK, http.StatusNoContent) - return err + return vmPutJSON(ctx, hc, "vm.add-disk", "add-disk request", disk, http.StatusOK, http.StatusNoContent) } // removeDeviceVM is non-idempotent: a retry after CH already detached the // device but the response was lost would surface as "id not found" and mask -// the original success. Route through vmAPIOnce, same shape as the add-* hot -// paths. +// the original success. func removeDeviceVM(ctx context.Context, hc *http.Client, deviceID string) error { - body, err := json.Marshal(map[string]string{"id": deviceID}) - if err != nil { - return fmt.Errorf("marshal remove-device request: %w", err) - } - _, err = vmAPIOnce(ctx, hc, "vm.remove-device", body) - return err + return vmPutJSON(ctx, hc, "vm.remove-device", "remove-device request", map[string]string{"id": deviceID}) } func addNetVM(ctx context.Context, hc *http.Client, net chNet) error { - body, err := json.Marshal(net) - if err != nil { - return fmt.Errorf("marshal add-net request: %w", err) - } - _, err = vmAPIOnce(ctx, hc, "vm.add-net", body, http.StatusOK, http.StatusNoContent) - return err + return vmPutJSON(ctx, hc, "vm.add-net", "add-net request", net, http.StatusOK, http.StatusNoContent) } // getVMInfo fetches vm.info; cocoon uses it to detect tag/id conflicts diff --git a/hypervisor/utils.go b/hypervisor/utils.go index adb78e6..17df610 100644 --- a/hypervisor/utils.go +++ b/hypervisor/utils.go @@ -35,6 +35,10 @@ const ( // MinDataDiskSize is the minimum user data disk size; mkfs.ext4 is // unstable below this on small sparse files. MinDataDiskSize int64 = 16 << 20 + + // socketReadyPollInterval is the WaitForSocket poll cadence — VMM socket + // usually appears within a few ms after process start. + socketReadyPollInterval = 1 * time.Millisecond ) // SnapshotFileKind classifies a snapshot file for CloneSnapshotFiles. @@ -75,8 +79,7 @@ func ExtractBlobIDs(storageConfigs []*types.StorageConfig, boot *types.BootConfi return ids } -// BlobHexFromPath strips the directory and extension from a blob path, e.g. -// "/var/lib/cocoon/oci/blobs/abc123.erofs" → "abc123". +// BlobHexFromPath returns the digest hex of a blob path (e.g. .../abc123.erofs → abc123). func BlobHexFromPath(path string) string { base := filepath.Base(path) return strings.TrimSuffix(base, filepath.Ext(base)) @@ -418,7 +421,7 @@ func CleanSnapshotFiles(runDir string, match func(name string) bool) error { } func WaitForSocket(ctx context.Context, socketPath string, pid int, timeout time.Duration, processName string) error { - return utils.WaitFor(ctx, timeout, 1*time.Millisecond, func() (bool, error) { //nolint:mnd + return utils.WaitFor(ctx, timeout, socketReadyPollInterval, func() (bool, error) { if utils.CheckSocket(socketPath) == nil { return true, nil } diff --git a/network/cni/create_linux.go b/network/cni/create_linux.go index 5ea2552..2ad228f 100644 --- a/network/cni/create_linux.go +++ b/network/cni/create_linux.go @@ -18,6 +18,9 @@ import ( "github.com/cocoonstack/cocoon/utils" ) +// netnsDeleteRetryInterval polls for async kernel cleanup of a named netns. +const netnsDeleteRetryInterval = 100 * time.Millisecond + // createNetns creates a named netns at /var/run/netns/{name}. func createNetns(name string) error { runtime.LockOSThread() @@ -43,7 +46,7 @@ func createNetns(name string) error { // deleteNetns removes a named netns with retry for async kernel cleanup. func deleteNetns(ctx context.Context, name string) error { - return utils.WaitFor(ctx, time.Second, 100*time.Millisecond, func() (bool, error) { //nolint:mnd + return utils.WaitFor(ctx, time.Second, netnsDeleteRetryInterval, func() (bool, error) { err := netns.DeleteNamed(name) return err == nil || errors.Is(err, fs.ErrNotExist), nil }) diff --git a/snapshot/envelope.go b/snapshot/envelope.go index 9ff879d..5af1eca 100644 --- a/snapshot/envelope.go +++ b/snapshot/envelope.go @@ -1,6 +1,7 @@ package snapshot import ( + "encoding/json" "errors" "fmt" "io/fs" @@ -35,9 +36,21 @@ func ReadSnapshotEnvelope(dir string) (types.SnapshotConfig, error) { return envelope.Config, nil } +// MarshalEnvelope returns the indented snapshot.json bytes for cfg. +func MarshalEnvelope(cfg types.SnapshotConfig) ([]byte, error) { + data, err := json.MarshalIndent(types.SnapshotExport{Version: EnvelopeVersion, Config: cfg}, "", " ") + if err != nil { + return nil, fmt.Errorf("marshal snapshot envelope: %w", err) + } + return append(data, '\n'), nil +} + // WriteSnapshotEnvelope writes