Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions cmd/vm/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ type chDebugSpec struct {
CowPath string
CHBin string
MaxCPU int
Memory int
Balloon int
CowSize int
DirectBoot bool
}

Expand Down Expand Up @@ -151,8 +149,6 @@ func buildCHDebugSpec(cmd *cobra.Command, storageConfigs []*types.StorageConfig,
balloon, _ := cmd.Flags().GetInt("balloon")
cowPath, _ := cmd.Flags().GetString("cow")
chBin, _ := cmd.Flags().GetString("ch")
memoryMB := int(vmCfg.Memory >> 20) //nolint:mnd
cowSizeGB := int(vmCfg.Storage >> 30) //nolint:mnd
// Mirror runtime gating: Windows / sub-MinBalloon VMs never get balloon,
// even if the user passed --balloon, so debug output stays truthful.
size, ok := hypervisor.BalloonSize(vmCfg.Memory, vmCfg.Windows)
Expand All @@ -169,9 +165,7 @@ func buildCHDebugSpec(cmd *cobra.Command, storageConfigs []*types.StorageConfig,
CowPath: cowPath,
CHBin: chBin,
MaxCPU: maxCPU,
Memory: memoryMB,
Balloon: balloon,
CowSize: cowSizeGB,
DirectBoot: boot.KernelPath != "",
}
}
Expand All @@ -195,7 +189,7 @@ func printCHDebug(s chDebugSpec) {
cocoonLayers, hypervisor.CowSerial)

fmt.Println("# Prepare COW disk")
fmt.Printf("truncate -s %dG %s\n", s.CowSize, s.CowPath)
fmt.Printf("truncate -s %dG %s\n", s.VMCfg.Storage>>30, s.CowPath) //nolint:mnd
fmt.Printf("mkfs.ext4 -F -m 0 -q -E lazy_itable_init=1,lazy_journal_init=1,discard %s\n", s.CowPath)
fmt.Println()
fmt.Printf("# Launch VM: %s (image: %s, boot: direct kernel)\n", s.VMCfg.Name, s.VMCfg.Image)
Expand All @@ -215,8 +209,8 @@ func printCHDebug(s chDebugSpec) {
basePath := s.Configs[0].Path
fmt.Println("# Prepare COW overlay")
fmt.Printf("qemu-img create -f qcow2 -F qcow2 -b %s %s\n", basePath, s.CowPath)
if s.CowSize > 0 {
fmt.Printf("qemu-img resize %s %dG\n", s.CowPath, s.CowSize)
if s.VMCfg.Storage > 0 {
fmt.Printf("qemu-img resize %s %dG\n", s.CowPath, s.VMCfg.Storage>>30) //nolint:mnd
}
fmt.Println()
fmt.Printf("# Launch VM: %s (image: %s, boot: UEFI firmware)\n", s.VMCfg.Name, s.VMCfg.Image)
Expand All @@ -239,7 +233,7 @@ func printCommonCHArgs(s chDebugSpec) {
memExtra = ",shared=on"
}
fmt.Printf(" --cpus boot=%d,max=%d%s \\\n", s.VMCfg.CPU, s.MaxCPU, cpuExtra)
fmt.Printf(" --memory size=%dM%s \\\n", s.Memory, memExtra)
fmt.Printf(" --memory size=%dM%s \\\n", s.VMCfg.Memory>>20, memExtra) //nolint:mnd
fmt.Printf(" --rng src=/dev/urandom \\\n")
if s.Balloon > 0 {
fmt.Printf(" --balloon size=%dM,deflate_on_oom=on,free_page_reporting=on \\\n", s.Balloon)
Expand Down
11 changes: 8 additions & 3 deletions cmd/vm/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import (
"github.com/cocoonstack/cocoon/utils"
)

// logHeadSigLen spans CH/FC's boot timestamp on line 1.
const logHeadSigLen = 64
const (
// logHeadSigLen spans CH/FC's boot timestamp on line 1.
logHeadSigLen = 64
// logFollowDebounce coalesces fsnotify events on the log file before
// the catch-up io.Copy fires.
logFollowDebounce = 100 * time.Millisecond
)

// attachedDevices is the inspect-only view of runtime hot-plugged devices.
// Cocoon never persists this structure; it is read from CH vm.info on demand.
Expand Down Expand Up @@ -217,7 +222,7 @@ func streamLog(ctx context.Context, path string, follow bool, tail int) error {
return nil
}

events, err := utils.WatchFile(ctx, path, 100*time.Millisecond) //nolint:mnd
events, err := utils.WatchFile(ctx, path, logFollowDebounce)
if err != nil {
return fmt.Errorf("watch log: %w", err)
}
Expand Down
90 changes: 49 additions & 41 deletions cmd/vm/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/cocoonstack/cocoon/utils"
)

// statusWatchDebounce coalesces fsnotify events on the per-backend index file
// during `vm status` polling.
const statusWatchDebounce = 200 * time.Millisecond

type vmEvent struct {
Event string `json:"event"`
VM types.VM `json:"vm"`
Expand All @@ -32,6 +36,12 @@ type vmSnapshot struct {
memory int64
}

type eventEmitter struct {
begin func()
emit func(event string, snap vmSnapshot, vm types.VM)
end func()
}

func (h Handler) List(cmd *cobra.Command, _ []string) error {
ctx, conf, err := h.Init(cmd)
if err != nil {
Expand Down Expand Up @@ -102,7 +112,7 @@ func mergeWatchChannels(ctx context.Context, hypers []hypervisor.Hypervisor) <-c
if !ok {
continue
}
ch, err := utils.WatchFile(ctx, w.WatchPath(), 200*time.Millisecond) //nolint:mnd
ch, err := utils.WatchFile(ctx, w.WatchPath(), statusWatchDebounce)
if err == nil {
channels = append(channels, ch)
}
Expand Down Expand Up @@ -179,72 +189,70 @@ func statusRefreshLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filt
func statusEventLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time) {
fmt.Println("EVENT\tID\tNAME\tSTATE\tCPU\tMEMORY\tIP\tIMAGE") //nolint:errcheck

prev := map[string]vmSnapshot{}
runLoop(ctx, watchCh, tick, func() {
vms := listAndFilter(ctx, hypers, filters)
curr := make(map[string]vmSnapshot, len(vms))
for _, vm := range vms {
curr[vm.ID] = takeSnapshot(vm)
}

w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
for id, snap := range curr {
old, existed := prev[id]
switch {
case !existed:
printEventRow(w, "ADDED", snap)
case old != snap:
printEventRow(w, "MODIFIED", snap)
}
}
for id, snap := range prev {
if _, exists := curr[id]; !exists {
printEventRow(w, "DELETED", snap)
}
}
_ = w.Flush()
prev = curr
var w *tabwriter.Writer
statusEventDiffLoop(ctx, hypers, filters, watchCh, tick, eventEmitter{
begin: func() { w = tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) },
emit: func(event string, snap vmSnapshot, _ types.VM) { printEventRow(w, event, snap) },
end: func() { _ = w.Flush() },
})
}

func statusEventLoopJSON(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time) {
enc := json.NewEncoder(os.Stdout)
type snapshotWithVM struct {
statusEventDiffLoop(ctx, hypers, filters, watchCh, tick, eventEmitter{
emit: func(event string, _ vmSnapshot, vm types.VM) {
_ = enc.Encode(vmEvent{Event: event, VM: vm})
},
})
}

// statusEventDiffLoop runs the shared diff: snapshot all VMs each tick, compare
// against the previous tick's snapshots, and emit ADDED/MODIFIED/DELETED events
// via emitter. Holds both snap and vm so emitters can choose either format.
func statusEventDiffLoop(ctx context.Context, hypers []hypervisor.Hypervisor, filters []string, watchCh <-chan struct{}, tick <-chan time.Time, emitter eventEmitter) {
type entry struct {
snap vmSnapshot
vm types.VM
}
prev := map[string]snapshotWithVM{}
prev := map[string]entry{}
runLoop(ctx, watchCh, tick, func() {
vms := listAndFilter(ctx, hypers, filters)
curr := make(map[string]snapshotWithVM, len(vms))
curr := make(map[string]entry, len(vms))
for _, vm := range vms {
vm.State = types.VMState(cmdcore.ReconcileState(vm))
curr[vm.ID] = snapshotWithVM{snap: takeSnapshot(vm), vm: *vm}
state := cmdcore.ReconcileState(vm)
vmCopy := *vm
vmCopy.State = types.VMState(state)
curr[vm.ID] = entry{snap: takeSnapshot(vm, state), vm: vmCopy}
}
Comment thread
CMGS marked this conversation as resolved.

for id, entry := range curr {
if emitter.begin != nil {
emitter.begin()
}
for id, e := range curr {
old, existed := prev[id]
switch {
case !existed:
_ = enc.Encode(vmEvent{Event: "ADDED", VM: entry.vm})
case old.snap != entry.snap:
_ = enc.Encode(vmEvent{Event: "MODIFIED", VM: entry.vm})
emitter.emit("ADDED", e.snap, e.vm)
case old.snap != e.snap:
emitter.emit("MODIFIED", e.snap, e.vm)
}
}
for id, entry := range prev {
for id, e := range prev {
if _, exists := curr[id]; !exists {
_ = enc.Encode(vmEvent{Event: "DELETED", VM: entry.vm})
emitter.emit("DELETED", e.snap, e.vm)
}
Comment thread
CMGS marked this conversation as resolved.
}
if emitter.end != nil {
emitter.end()
}
prev = curr
})
}

func takeSnapshot(vm *types.VM) vmSnapshot {
func takeSnapshot(vm *types.VM, state string) vmSnapshot {
return vmSnapshot{
id: vm.ID,
name: vm.Config.Name,
state: cmdcore.ReconcileState(vm),
state: state,
cpu: vm.Config.CPU,
memory: vm.Config.Memory,
ip: vmIPs(vm),
Expand Down Expand Up @@ -293,7 +301,7 @@ func matchesFilter(vm *types.VM, filters []string) bool {
func snapshotAll(vms []*types.VM) []vmSnapshot {
result := make([]vmSnapshot, len(vms))
for i, vm := range vms {
result[i] = takeSnapshot(vm)
result[i] = takeSnapshot(vm, cmdcore.ReconcileState(vm))
}
return result
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/vm/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestVMIPsAndSort(t *testing.T) {
t.Fatalf("vmIPs() = %q, want %q", got, "10.0.0.2,10.0.0.3")
}

snap := takeSnapshot(vms[0])
snap := takeSnapshot(vms[0], "running")
if snap.id != "1" || snap.name != "earlier" || snap.image != "img-a" {
t.Fatalf("takeSnapshot() = %+v", snap)
}
Expand Down
35 changes: 15 additions & 20 deletions hypervisor/cloudhypervisor/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ func vmAPIOnce(ctx context.Context, hc *http.Client, endpoint string, body []byt
return utils.DoAPIOnce(ctx, hc, http.MethodPut, "http://localhost/api/v1/"+endpoint, body, successCodes...)
}

// vmPutJSON marshals payload and PUTs to endpoint via vmAPIOnce. Mirrors
// firecracker.putJSON so per-endpoint helpers stay one-line wrappers.
func vmPutJSON[T any](ctx context.Context, hc *http.Client, endpoint, kind string, payload T, successCodes ...int) error {
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal %s: %w", kind, err)
}
_, err = vmAPIOnce(ctx, hc, endpoint, body, successCodes...)
return err
}

// shutdownVM, pauseVM, resumeVM are all CH state transitions; a second call
// in the wrong state returns an error. Route through vmAPIOnce so a retry
// after a lost response cannot mask the original success.
Expand Down Expand Up @@ -158,34 +169,18 @@ func restoreVM(ctx context.Context, hc *http.Client, sourceDir string, onDemand
// masks the original success. Used during clone (cidata hot-plug, NIC swap)
// after vm.restore.
func addDiskVM(ctx context.Context, hc *http.Client, disk chDisk) error {
body, err := json.Marshal(disk)
if err != nil {
return fmt.Errorf("marshal add-disk request: %w", err)
}
_, err = vmAPIOnce(ctx, hc, "vm.add-disk", body, http.StatusOK, http.StatusNoContent)
return err
return vmPutJSON(ctx, hc, "vm.add-disk", "add-disk request", disk, http.StatusOK, http.StatusNoContent)
}

// removeDeviceVM is non-idempotent: a retry after CH already detached the
// device but the response was lost would surface as "id not found" and mask
// the original success. Route through vmAPIOnce, same shape as the add-* hot
// paths.
// the original success.
func removeDeviceVM(ctx context.Context, hc *http.Client, deviceID string) error {
body, err := json.Marshal(map[string]string{"id": deviceID})
if err != nil {
return fmt.Errorf("marshal remove-device request: %w", err)
}
_, err = vmAPIOnce(ctx, hc, "vm.remove-device", body)
return err
return vmPutJSON(ctx, hc, "vm.remove-device", "remove-device request", map[string]string{"id": deviceID})
}

func addNetVM(ctx context.Context, hc *http.Client, net chNet) error {
body, err := json.Marshal(net)
if err != nil {
return fmt.Errorf("marshal add-net request: %w", err)
}
_, err = vmAPIOnce(ctx, hc, "vm.add-net", body, http.StatusOK, http.StatusNoContent)
return err
return vmPutJSON(ctx, hc, "vm.add-net", "add-net request", net, http.StatusOK, http.StatusNoContent)
}

// getVMInfo fetches vm.info; cocoon uses it to detect tag/id conflicts
Expand Down
9 changes: 6 additions & 3 deletions hypervisor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
// MinDataDiskSize is the minimum user data disk size; mkfs.ext4 is
// unstable below this on small sparse files.
MinDataDiskSize int64 = 16 << 20

// socketReadyPollInterval is the WaitForSocket poll cadence — VMM socket
// usually appears within a few ms after process start.
socketReadyPollInterval = 1 * time.Millisecond
)

// SnapshotFileKind classifies a snapshot file for CloneSnapshotFiles.
Expand Down Expand Up @@ -75,8 +79,7 @@ func ExtractBlobIDs(storageConfigs []*types.StorageConfig, boot *types.BootConfi
return ids
}

// BlobHexFromPath strips the directory and extension from a blob path, e.g.
// "/var/lib/cocoon/oci/blobs/abc123.erofs" → "abc123".
// BlobHexFromPath returns the digest hex of a blob path (e.g. .../abc123.erofs → abc123).
func BlobHexFromPath(path string) string {
base := filepath.Base(path)
return strings.TrimSuffix(base, filepath.Ext(base))
Expand Down Expand Up @@ -418,7 +421,7 @@ func CleanSnapshotFiles(runDir string, match func(name string) bool) error {
}

func WaitForSocket(ctx context.Context, socketPath string, pid int, timeout time.Duration, processName string) error {
return utils.WaitFor(ctx, timeout, 1*time.Millisecond, func() (bool, error) { //nolint:mnd
return utils.WaitFor(ctx, timeout, socketReadyPollInterval, func() (bool, error) {
if utils.CheckSocket(socketPath) == nil {
return true, nil
}
Expand Down
5 changes: 4 additions & 1 deletion network/cni/create_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/cocoonstack/cocoon/utils"
)

// netnsDeleteRetryInterval polls for async kernel cleanup of a named netns.
const netnsDeleteRetryInterval = 100 * time.Millisecond

// createNetns creates a named netns at /var/run/netns/{name}.
func createNetns(name string) error {
runtime.LockOSThread()
Expand All @@ -43,7 +46,7 @@ func createNetns(name string) error {

// deleteNetns removes a named netns with retry for async kernel cleanup.
func deleteNetns(ctx context.Context, name string) error {
return utils.WaitFor(ctx, time.Second, 100*time.Millisecond, func() (bool, error) { //nolint:mnd
return utils.WaitFor(ctx, time.Second, netnsDeleteRetryInterval, func() (bool, error) {
err := netns.DeleteNamed(name)
return err == nil || errors.Is(err, fs.ErrNotExist), nil
})
Expand Down
17 changes: 15 additions & 2 deletions snapshot/envelope.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snapshot

import (
"encoding/json"
"errors"
"fmt"
"io/fs"
Expand Down Expand Up @@ -35,9 +36,21 @@ func ReadSnapshotEnvelope(dir string) (types.SnapshotConfig, error) {
return envelope.Config, nil
}

// MarshalEnvelope returns the indented snapshot.json bytes for cfg.
func MarshalEnvelope(cfg types.SnapshotConfig) ([]byte, error) {
data, err := json.MarshalIndent(types.SnapshotExport{Version: EnvelopeVersion, Config: cfg}, "", " ")
if err != nil {
return nil, fmt.Errorf("marshal snapshot envelope: %w", err)
}
return append(data, '\n'), nil
}

// WriteSnapshotEnvelope writes <dir>/snapshot.json atomically so a concurrent
// reader can't see a partial write.
func WriteSnapshotEnvelope(dir string, cfg types.SnapshotConfig) error {
return utils.AtomicWriteJSON(filepath.Join(dir, SnapshotJSONName),
types.SnapshotExport{Version: EnvelopeVersion, Config: cfg})
data, err := MarshalEnvelope(cfg)
if err != nil {
return err
}
return utils.AtomicWriteFile(filepath.Join(dir, SnapshotJSONName), data, 0o644)
}
Loading
Loading