Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/cli/commands/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func psTable(ps []desktop.BackendStatus) string {
}

func formatUntil(status desktop.BackendStatus) string {
if status.Loading {
return "Loading..."
}

keepAlive := inference.KeepAliveDefault
if status.KeepAlive != nil {
keepAlive = *status.KeepAlive
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/desktop/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ type BackendStatus struct {
Mode string `json:"mode"`
LastUsed time.Time `json:"last_used,omitempty"`
InUse bool `json:"in_use,omitempty"`
Loading bool `json:"loading,omitempty"`
KeepAlive *inference.KeepAlive `json:"keep_alive,omitempty"`
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/inference/scheduling/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ type BackendStatus struct {
// LastUsed represents when this (backend, model, mode) tuple was last used
LastUsed time.Time `json:"last_used,omitempty"`
// InUse indicates whether this backend is currently handling a request
InUse bool `json:"in_use,omitempty"`
InUse bool `json:"in_use,omitempty"`
// Loading indicates whether this backend is currently being initialized
Loading bool `json:"loading,omitempty"`
KeepAlive *inference.KeepAlive `json:"keep_alive,omitempty"`
}

Expand Down
131 changes: 102 additions & 29 deletions pkg/inference/scheduling/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ type runnerInfo struct {
modelRef string
}

// loadingInfo holds metadata about a runner that is being initialized.
type loadingInfo struct {
backendName string
modelID string
draftModelID string
modelRef string
mode inference.BackendMode
}
Comment thread
doringeman marked this conversation as resolved.

// loader manages the loading and unloading of backend runners. It regulates
// active backends in a manner that avoids exhausting system resources. Loaders
// assume that all of their backends have been installed, so no load requests
Expand Down Expand Up @@ -109,6 +118,12 @@ type loader struct {
waiters map[chan<- struct{}]bool
// runners maps runner keys to their slot index.
runners map[runnerKey]runnerInfo
// loading tracks slots that have a runner being initialized. This
// allows the lock to be released during long-running operations
// (run + wait) while still preventing other goroutines from using
// or evicting those slots. The value contains metadata needed to
// report loading status.
loading map[int]loadingInfo
// slots maps slot indices to associated runners. A slot is considered free
// if the runner value in it is nil.
slots []*runner
Expand Down Expand Up @@ -157,6 +172,7 @@ func newLoader(
guard: make(chan struct{}, 1),
waiters: make(map[chan<- struct{}]bool),
runners: make(map[runnerKey]runnerInfo, nSlots),
loading: make(map[int]loadingInfo),
slots: make([]*runner, nSlots),
references: make([]uint, nSlots),
timestamps: make([]time.Time, nSlots),
Expand Down Expand Up @@ -411,6 +427,30 @@ func (l *loader) run(ctx context.Context) {
}
}

// usedSlots returns the number of slots that are either occupied by a
// registered runner or reserved for a runner being loaded.
func (l *loader) usedSlots() int {
return len(l.runners) + len(l.loading)
}

// isSlotLoading reports whether the given slot is reserved for a runner
// that is currently being initialized.
func (l *loader) isSlotLoading(slot int) bool {
_, ok := l.loading[slot]
return ok
}

// isModelLoading reports whether a runner for the given model is currently
// being initialized by another goroutine.
func (l *loader) isModelLoading(backendName, modelID, draftModelID string, mode inference.BackendMode) bool {
for _, info := range l.loading {
if info.backendName == backendName && info.modelID == modelID && info.draftModelID == draftModelID && info.mode == mode {
return true
}
}
return false
}

// load allocates a runner using the specified backend and modelID. If allocated,
// it should be released by the caller using the release mechanism (once the
// runner is no longer needed).
Expand All @@ -427,7 +467,9 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string
if !l.lock(ctx) {
return nil, context.Canceled
}
defer l.unlock()
// Note: the lock is managed explicitly throughout this function rather
// than via defer, because it is released during long-running operations
// (run + wait) and re-acquired afterwards.

// Get runner configuration if available (must be done under lock since
// runnerConfigs can be modified concurrently by setRunnerConfig).
Expand Down Expand Up @@ -468,22 +510,37 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string
// ensure that it's deregistered by the time we return.
poll := make(chan struct{}, 1)
l.waiters[poll] = true
defer func() {

// cleanupAndReturn is a helper that cleans up the waiter registration,
// releases the lock, and returns. All exit paths must go through this
// to avoid leaking the poll channel or double-unlocking.
cleanupAndReturn := func(r *runner, err error) (*runner, error) {
delete(l.waiters, poll)
}()
l.unlock()
return r, err
}

// Loop until we can satisfy the request or an error occurs.
// These are declared outside the loop to avoid goto-over-declaration errors.
var existing runnerInfo
var existingOK bool
for {
slot := -1

// If loads are disabled, then there's nothing we can do.
if !l.loadsEnabled {
return nil, errLoadsDisabled
return cleanupAndReturn(nil, errLoadsDisabled)
}

Comment thread
doringeman marked this conversation as resolved.
// See if another goroutine is already loading this runner.
// If so, wait for it to finish rather than starting a duplicate load.
if l.isModelLoading(backendName, modelID, draftModelID, mode) {
goto WaitForChange
}

// See if we can satisfy the request with an existing runner.
existing, ok := l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)]
if ok {
existing, existingOK = l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)]
if existingOK {
select {
case <-l.slots[existing.slot].done:
l.log.Warn("Runner is defunct, waiting for eviction", "backend", backendName, "model", existing.modelRef)
Expand All @@ -497,13 +554,13 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string
default:
l.references[existing.slot]++
l.timestamps[existing.slot] = time.Time{}
return l.slots[existing.slot], nil
return cleanupAndReturn(l.slots[existing.slot], nil)
}
}

// If all slots are full, try evicting unused runners.
if len(l.runners) == len(l.slots) {
l.log.Info("Evicting to make room", "runners", len(l.runners), "slots", len(l.slots))
// If all slots are full (including loading reservations), try evicting unused runners.
if l.usedSlots() >= len(l.slots) {
l.log.Info("Evicting to make room", "runners", len(l.runners), "loading", len(l.loading), "slots", len(l.slots))
runnerCountAtLoopStart := len(l.runners)
remainingRunners := l.evict(false)
// Restart the loop if eviction happened
Expand All @@ -512,46 +569,62 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string
}
}

// If there's a free slot, then find the slot.
if len(l.runners) < len(l.slots) {
// If there's a free slot, then find one that is not reserved for loading.
if l.usedSlots() < len(l.slots) {
for s, runner := range l.slots {
if runner == nil {
if runner == nil && !l.isSlotLoading(s) {
slot = s
break
}
}
}

if slot < 0 {
l.log.Debug("Cannot load model yet", "runners", len(l.runners), "slots", len(l.slots))
l.log.Debug("Cannot load model yet", "runners", len(l.runners), "loading", len(l.loading), "slots", len(l.slots))
}

// If we've identified a slot, then we're ready to start a runner.
if slot >= 0 {
// Create the runner.
runner, err := run(l.log, backend, modelID, modelRef, mode, slot, runnerConfig, l.openAIRecorder)
// Reserve the slot and release the lock for the long-running
// operations (run + wait). This allows other goroutines to
// proceed with loading different models, releasing runners, etc.
l.loading[slot] = loadingInfo{
backendName: backendName,
modelID: modelID,
draftModelID: draftModelID,
modelRef: modelRef,
mode: mode,
}
Comment thread
doringeman marked this conversation as resolved.
l.unlock()

newRunner, err := run(l.log, backend, modelID, modelRef, mode, slot, runnerConfig, l.openAIRecorder)
if err != nil {
l.log.Warn("Unable to start backend runner", "backend", backendName, "model", modelID, "mode", mode, "error", err)
return nil, fmt.Errorf("unable to start runner: %w", err)
l.lock(context.Background())
Comment thread
doringeman marked this conversation as resolved.
delete(l.loading, slot)
l.broadcast()
return cleanupAndReturn(nil, fmt.Errorf("unable to start runner: %w", err))
}

// Wait for the runner to be ready. In theory it's a little
// inefficient to block all other loaders (including those that
// might not want this runner), but in reality they would probably
// be blocked by the underlying loading anyway (in terms of disk and
// GPU performance). We have to retain a lock here though to enforce
// deduplication of runners and keep slot / memory reservations.
if err := runner.wait(ctx); err != nil {
runner.terminate()
if err := newRunner.wait(ctx); err != nil {
newRunner.terminate()
l.log.Warn("Backend runner initialization failed", "backend", backendName, "model", modelID, "mode", mode, "error", err)
return nil, fmt.Errorf("error waiting for runner to be ready: %w", err)
l.lock(context.Background())
delete(l.loading, slot)
l.broadcast()
return cleanupAndReturn(nil, fmt.Errorf("error waiting for runner to be ready: %w", err))
}

// Re-acquire lock and register the runner.
l.lock(context.Background())
delete(l.loading, slot)

// Perform registration and return the runner.
l.runners[makeRunnerKey(backendName, modelID, draftModelID, mode)] = runnerInfo{slot, modelRef}
l.slots[slot] = runner
l.slots[slot] = newRunner
l.references[slot] = 1
return runner, nil
l.broadcast()
return cleanupAndReturn(newRunner, nil)
}

// Wait for something to change. Note that we always re-lock with
Expand All @@ -562,7 +635,7 @@ func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string
select {
case <-ctx.Done():
l.lock(context.Background())
return nil, context.Canceled
return cleanupAndReturn(nil, context.Canceled)
case <-poll:
l.lock(context.Background())
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/inference/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus {
}
defer s.loader.unlock()

result := make([]BackendStatus, 0, len(s.loader.runners))
result := make([]BackendStatus, 0, len(s.loader.runners)+len(s.loader.loading))

for key, runnerInfo := range s.loader.runners {
if s.loader.slots[runnerInfo.slot] != nil {
Expand All @@ -205,6 +205,16 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus {
}
}

// Include models that are currently being loaded.
for _, info := range s.loader.loading {
result = append(result, BackendStatus{
BackendName: info.backendName,
ModelName: info.modelRef,
Mode: info.mode.String(),
Loading: true,
})
}

return result
}

Expand Down
Loading