From 6e3a2082058f2df0b317ec7a105b2b909da4ada1 Mon Sep 17 00:00:00 2001 From: Dorin Geman Date: Tue, 17 Mar 2026 15:47:37 +0200 Subject: [PATCH] fix(scheduling): release lock during runner init and show loading state in ps Signed-off-by: Dorin Geman --- cmd/cli/commands/ps.go | 4 + cmd/cli/desktop/desktop.go | 1 + pkg/inference/scheduling/api.go | 4 +- pkg/inference/scheduling/loader.go | 131 ++++++++++++++++++++------ pkg/inference/scheduling/scheduler.go | 12 ++- 5 files changed, 121 insertions(+), 31 deletions(-) diff --git a/cmd/cli/commands/ps.go b/cmd/cli/commands/ps.go index 77909d050..40270a82b 100644 --- a/cmd/cli/commands/ps.go +++ b/cmd/cli/commands/ps.go @@ -55,6 +55,10 @@ func psTable(ps []desktop.BackendStatus) string { } func formatUntil(status desktop.BackendStatus) string { + if status.Loading { + return "Loading..." + } + keepAlive := inference.KeepAliveDefault if status.KeepAlive != nil { keepAlive = *status.KeepAlive diff --git a/cmd/cli/desktop/desktop.go b/cmd/cli/desktop/desktop.go index 626f4eab9..bb1fbcd5a 100644 --- a/cmd/cli/desktop/desktop.go +++ b/cmd/cli/desktop/desktop.go @@ -747,6 +747,7 @@ type BackendStatus struct { Mode string `json:"mode"` LastUsed time.Time `json:"last_used,omitempty"` InUse bool `json:"in_use,omitempty"` + Loading bool `json:"loading,omitempty"` KeepAlive *inference.KeepAlive `json:"keep_alive,omitempty"` } diff --git a/pkg/inference/scheduling/api.go b/pkg/inference/scheduling/api.go index 10b813201..2dd11df46 100644 --- a/pkg/inference/scheduling/api.go +++ b/pkg/inference/scheduling/api.go @@ -79,7 +79,9 @@ type BackendStatus struct { // LastUsed represents when this (backend, model, mode) tuple was last used LastUsed time.Time `json:"last_used,omitempty"` // InUse indicates whether this backend is currently handling a request - InUse bool `json:"in_use,omitempty"` + InUse bool `json:"in_use,omitempty"` + // Loading indicates whether this backend is currently being initialized + Loading bool `json:"loading,omitempty"` KeepAlive *inference.KeepAlive `json:"keep_alive,omitempty"` } diff --git a/pkg/inference/scheduling/loader.go b/pkg/inference/scheduling/loader.go index 94ba7aced..be2a5f81b 100644 --- a/pkg/inference/scheduling/loader.go +++ b/pkg/inference/scheduling/loader.go @@ -80,6 +80,15 @@ type runnerInfo struct { modelRef string } +// loadingInfo holds metadata about a runner that is being initialized. +type loadingInfo struct { + backendName string + modelID string + draftModelID string + modelRef string + mode inference.BackendMode +} + // loader manages the loading and unloading of backend runners. It regulates // active backends in a manner that avoids exhausting system resources. Loaders // assume that all of their backends have been installed, so no load requests @@ -109,6 +118,12 @@ type loader struct { waiters map[chan<- struct{}]bool // runners maps runner keys to their slot index. runners map[runnerKey]runnerInfo + // loading tracks slots that have a runner being initialized. This + // allows the lock to be released during long-running operations + // (run + wait) while still preventing other goroutines from using + // or evicting those slots. The value contains metadata needed to + // report loading status. + loading map[int]loadingInfo // slots maps slot indices to associated runners. A slot is considered free // if the runner value in it is nil. slots []*runner @@ -157,6 +172,7 @@ func newLoader( guard: make(chan struct{}, 1), waiters: make(map[chan<- struct{}]bool), runners: make(map[runnerKey]runnerInfo, nSlots), + loading: make(map[int]loadingInfo), slots: make([]*runner, nSlots), references: make([]uint, nSlots), timestamps: make([]time.Time, nSlots), @@ -411,6 +427,30 @@ func (l *loader) run(ctx context.Context) { } } +// usedSlots returns the number of slots that are either occupied by a +// registered runner or reserved for a runner being loaded. +func (l *loader) usedSlots() int { + return len(l.runners) + len(l.loading) +} + +// isSlotLoading reports whether the given slot is reserved for a runner +// that is currently being initialized. +func (l *loader) isSlotLoading(slot int) bool { + _, ok := l.loading[slot] + return ok +} + +// isModelLoading reports whether a runner for the given model is currently +// being initialized by another goroutine. +func (l *loader) isModelLoading(backendName, modelID, draftModelID string, mode inference.BackendMode) bool { + for _, info := range l.loading { + if info.backendName == backendName && info.modelID == modelID && info.draftModelID == draftModelID && info.mode == mode { + return true + } + } + return false +} + // load allocates a runner using the specified backend and modelID. If allocated, // it should be released by the caller using the release mechanism (once the // runner is no longer needed). @@ -427,7 +467,9 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string if !l.lock(ctx) { return nil, context.Canceled } - defer l.unlock() + // Note: the lock is managed explicitly throughout this function rather + // than via defer, because it is released during long-running operations + // (run + wait) and re-acquired afterwards. // Get runner configuration if available (must be done under lock since // runnerConfigs can be modified concurrently by setRunnerConfig). @@ -468,22 +510,37 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string // ensure that it's deregistered by the time we return. poll := make(chan struct{}, 1) l.waiters[poll] = true - defer func() { + + // cleanupAndReturn is a helper that cleans up the waiter registration, + // releases the lock, and returns. All exit paths must go through this + // to avoid leaking the poll channel or double-unlocking. + cleanupAndReturn := func(r *runner, err error) (*runner, error) { delete(l.waiters, poll) - }() + l.unlock() + return r, err + } // Loop until we can satisfy the request or an error occurs. + // These are declared outside the loop to avoid goto-over-declaration errors. + var existing runnerInfo + var existingOK bool for { slot := -1 // If loads are disabled, then there's nothing we can do. if !l.loadsEnabled { - return nil, errLoadsDisabled + return cleanupAndReturn(nil, errLoadsDisabled) + } + + // See if another goroutine is already loading this runner. + // If so, wait for it to finish rather than starting a duplicate load. + if l.isModelLoading(backendName, modelID, draftModelID, mode) { + goto WaitForChange } // See if we can satisfy the request with an existing runner. - existing, ok := l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)] - if ok { + existing, existingOK = l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)] + if existingOK { select { case <-l.slots[existing.slot].done: l.log.Warn("Runner is defunct, waiting for eviction", "backend", backendName, "model", existing.modelRef) @@ -497,13 +554,13 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string default: l.references[existing.slot]++ l.timestamps[existing.slot] = time.Time{} - return l.slots[existing.slot], nil + return cleanupAndReturn(l.slots[existing.slot], nil) } } - // If all slots are full, try evicting unused runners. - if len(l.runners) == len(l.slots) { - l.log.Info("Evicting to make room", "runners", len(l.runners), "slots", len(l.slots)) + // If all slots are full (including loading reservations), try evicting unused runners. + if l.usedSlots() >= len(l.slots) { + l.log.Info("Evicting to make room", "runners", len(l.runners), "loading", len(l.loading), "slots", len(l.slots)) runnerCountAtLoopStart := len(l.runners) remainingRunners := l.evict(false) // Restart the loop if eviction happened @@ -512,10 +569,10 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string } } - // If there's a free slot, then find the slot. - if len(l.runners) < len(l.slots) { + // If there's a free slot, then find one that is not reserved for loading. + if l.usedSlots() < len(l.slots) { for s, runner := range l.slots { - if runner == nil { + if runner == nil && !l.isSlotLoading(s) { slot = s break } @@ -523,35 +580,51 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string } if slot < 0 { - l.log.Debug("Cannot load model yet", "runners", len(l.runners), "slots", len(l.slots)) + l.log.Debug("Cannot load model yet", "runners", len(l.runners), "loading", len(l.loading), "slots", len(l.slots)) } // If we've identified a slot, then we're ready to start a runner. if slot >= 0 { - // Create the runner. - runner, err := run(l.log, backend, modelID, modelRef, mode, slot, runnerConfig, l.openAIRecorder) + // Reserve the slot and release the lock for the long-running + // operations (run + wait). This allows other goroutines to + // proceed with loading different models, releasing runners, etc. + l.loading[slot] = loadingInfo{ + backendName: backendName, + modelID: modelID, + draftModelID: draftModelID, + modelRef: modelRef, + mode: mode, + } + l.unlock() + + newRunner, err := run(l.log, backend, modelID, modelRef, mode, slot, runnerConfig, l.openAIRecorder) if err != nil { l.log.Warn("Unable to start backend runner", "backend", backendName, "model", modelID, "mode", mode, "error", err) - return nil, fmt.Errorf("unable to start runner: %w", err) + l.lock(context.Background()) + delete(l.loading, slot) + l.broadcast() + return cleanupAndReturn(nil, fmt.Errorf("unable to start runner: %w", err)) } - // Wait for the runner to be ready. In theory it's a little - // inefficient to block all other loaders (including those that - // might not want this runner), but in reality they would probably - // be blocked by the underlying loading anyway (in terms of disk and - // GPU performance). We have to retain a lock here though to enforce - // deduplication of runners and keep slot / memory reservations. - if err := runner.wait(ctx); err != nil { - runner.terminate() + if err := newRunner.wait(ctx); err != nil { + newRunner.terminate() l.log.Warn("Backend runner initialization failed", "backend", backendName, "model", modelID, "mode", mode, "error", err) - return nil, fmt.Errorf("error waiting for runner to be ready: %w", err) + l.lock(context.Background()) + delete(l.loading, slot) + l.broadcast() + return cleanupAndReturn(nil, fmt.Errorf("error waiting for runner to be ready: %w", err)) } + // Re-acquire lock and register the runner. + l.lock(context.Background()) + delete(l.loading, slot) + // Perform registration and return the runner. l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)] = runnerInfo{slot, modelRef} - l.slots[slot] = runner + l.slots[slot] = newRunner l.references[slot] = 1 - return runner, nil + l.broadcast() + return cleanupAndReturn(newRunner, nil) } // Wait for something to change. Note that we always re-lock with @@ -562,7 +635,7 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string select { case <-ctx.Done(): l.lock(context.Background()) - return nil, context.Canceled + return cleanupAndReturn(nil, context.Canceled) case <-poll: l.lock(context.Background()) } diff --git a/pkg/inference/scheduling/scheduler.go b/pkg/inference/scheduling/scheduler.go index 69273e1ba..a2f2dbdd1 100644 --- a/pkg/inference/scheduling/scheduler.go +++ b/pkg/inference/scheduling/scheduler.go @@ -180,7 +180,7 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus { } defer s.loader.unlock() - result := make([]BackendStatus, 0, len(s.loader.runners)) + result := make([]BackendStatus, 0, len(s.loader.runners)+len(s.loader.loading)) for key, runnerInfo := range s.loader.runners { if s.loader.slots[runnerInfo.slot] != nil { @@ -205,6 +205,16 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus { } } + // Include models that are currently being loaded. + for _, info := range s.loader.loading { + result = append(result, BackendStatus{ + BackendName: info.backendName, + ModelName: info.modelRef, + Mode: info.mode.String(), + Loading: true, + }) + } + return result }