diff --git a/scheduler/pkg/agent/server.go b/scheduler/pkg/agent/server.go index fc83f41f9b..a14d1a582c 100644 --- a/scheduler/pkg/agent/server.go +++ b/scheduler/pkg/agent/server.go @@ -43,7 +43,7 @@ import ( const ( grpcMaxConcurrentStreams = 1_000_000 - pendingSyncsQueueSize int = 10 + pendingSyncsQueueSize int = 1000 modelEventHandlerName = "agent.server.models" modelScalingCoolingDownSeconds = 60 // this is currently used in scale down events serverDrainingExtraWaitMillis = 500 diff --git a/scheduler/pkg/agent/server_test.go b/scheduler/pkg/agent/server_test.go index 2e0166e41c..70bd8dfcf5 100644 --- a/scheduler/pkg/agent/server_test.go +++ b/scheduler/pkg/agent/server_test.go @@ -38,7 +38,7 @@ type mockStore struct { var _ store.ModelStore = (*mockStore)(nil) -func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (m *mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { } func (m *mockStore) UpdateModel(config *pbs.LoadModelRequest) error { diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index bf10ae93ab..1984c4be63 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -41,7 +41,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 modelEventHandlerName = "incremental.processor.models" experimentEventHandlerName = "incremental.processor.experiments" pipelineEventHandlerName = "incremental.processor.pipelines" @@ -63,6 +63,7 @@ type IncrementalProcessor struct { batchWaitMillis time.Duration pendingModelVersions []*pendingModelVersion versionCleaner cleaner.ModelVersionCleaner + batchTriggerManual *time.Time } type pendingModelVersion struct { @@ -94,6 +95,7 @@ func NewIncrementalProcessor( batchTrigger: nil, batchWaitMillis: util.EnvoyUpdateDefaultBatchWaitMillis, versionCleaner: versionCleaner, + batchTriggerManual: nil, } err := ip.setListeners() @@ -542,13 +544,15 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { p.mu.Lock() defer p.mu.Unlock() p.modelStore.LockModel(modelName) - defer p.modelStore.UnlockModel(modelName) + + logger.Debugf("Calling model update for %s", modelName) model, err := p.modelStore.GetModel(modelName) if err != nil { logger.WithError(err).Warnf("Failed to sync model %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } } @@ -557,6 +561,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) } + p.modelStore.UnlockModel(modelName) return p.updateEnvoy() // in practice we should not be here } @@ -566,6 +571,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) } + p.modelStore.UnlockModel(modelName) return p.updateEnvoy() // in practice we should not be here } @@ -577,6 +583,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } modelRemoved = true @@ -598,6 +605,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { // Remove routes before we recreate if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.Debugf("Failed to remove route before starting update for %s", modelName) + p.modelStore.UnlockModel(modelName) return err } @@ -608,6 +616,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { logger.WithError(err).Errorf("Failed to add traffic for model %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) + p.modelStore.UnlockModel(modelName) return err } } @@ -621,9 +630,14 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { version: latestModel.GetVersion(), }, ) + p.modelStore.UnlockModel(modelName) + triggered := p.triggerModelSyncIfNeeded() - if p.batchTrigger == nil && p.runEnvoyBatchUpdates { - p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSync) + if !triggered { + // we still need to enable the cron timer as there is no guarantee that the manual trigger will be called + if p.batchTrigger == nil && p.runEnvoyBatchUpdates { + p.batchTrigger = time.AfterFunc(p.batchWaitMillis, p.modelSyncWithLock) + } } return nil @@ -640,10 +654,33 @@ func (p *IncrementalProcessor) callVersionCleanupIfNeeded(modelName string) { } } -func (p *IncrementalProcessor) modelSync() { - logger := p.logger.WithField("func", "modelSync") +func (p *IncrementalProcessor) triggerModelSyncIfNeeded() bool { + // the first time we trigger the batch update we need to set the time + if p.batchTriggerManual == nil { + p.batchTriggerManual = new(time.Time) + *p.batchTriggerManual = time.Now() + } + if time.Since(*p.batchTriggerManual) > p.batchWaitMillis { + // we have waited long enough so we can trigger the batch update + // we do this inline so that we do not require to release and reacquire the lock + // which under heavy load there is no guarantee of order and therefore could lead + // to starvation of the batch update + p.modelSync() + *p.batchTriggerManual = time.Now() + return true + } + return false +} + +func (p *IncrementalProcessor) modelSyncWithLock() { p.mu.Lock() defer p.mu.Unlock() + p.modelSync() +} + +func (p *IncrementalProcessor) modelSync() { + logger := p.logger.WithField("func", "modelSync") + logger.Debugf("Calling model sync") envoyErr := p.updateEnvoy() serverReplicaState := store.Available @@ -728,4 +765,5 @@ func (p *IncrementalProcessor) modelSync() { // Reset p.batchTrigger = nil p.pendingModelVersions = nil + logger.Debugf("Done modelSync") } diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index bc9fd8ca9b..9efbcc46d4 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -766,7 +766,7 @@ func TestModelSync(t *testing.T) { for _, op := range test.ops { op(inc, g) } - inc.modelSync() + inc.modelSyncWithLock() for modelName, modelReplicas := range test.expectedReplicaStats { model, err := inc.modelStore.GetModel(modelName) g.Expect(err).To(BeNil()) diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 431749b205..124def8a09 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -149,9 +149,10 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail) s.sortServers(latestModel, filteredServers) ok := false - logger.Debugf("Model %s candidate servers %v", modelName, filteredServers) + logger.Debugf("Model %s with desired replicas %d candidate servers %v", modelName, latestModel.DesiredReplicas(), filteredServers) // For each server filter and sort replicas and attempt schedule if enough replicas for _, candidateServer := range filteredServers { + logger.Debugf("Candidate server %s", candidateServer.Name) var candidateReplicas *sorters.CandidateServer // we need a lock here, we could have many goroutines at sorting @@ -175,7 +176,8 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } if !ok { failureErrMsg := fmt.Sprintf("failed to schedule model %s. %v", modelName, debugTrail) - s.store.FailedScheduling(latestModel, failureErrMsg) + // we do not want to reset the server if it has live replicas + s.store.FailedScheduling(latestModel, failureErrMsg, !latestModel.HasLiveReplicas()) return fmt.Errorf(failureErrMsg) } } diff --git a/scheduler/pkg/scheduler/scheduler_test.go b/scheduler/pkg/scheduler/scheduler_test.go index 0096265319..1081f1f5af 100644 --- a/scheduler/pkg/scheduler/scheduler_test.go +++ b/scheduler/pkg/scheduler/scheduler_test.go @@ -38,7 +38,7 @@ type mockStore struct { var _ store.ModelStore = (*mockStore)(nil) -func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f mockStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { } func (f mockStore) UnloadVersionModels(modelKey string, version uint32) (bool, error) { diff --git a/scheduler/pkg/store/experiment/store.go b/scheduler/pkg/store/experiment/store.go index d2ec00bfe8..9fb8b30743 100644 --- a/scheduler/pkg/store/experiment/store.go +++ b/scheduler/pkg/store/experiment/store.go @@ -31,7 +31,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 experimentStartEventSource = "experiment.store.start" experimentStopEventSource = "experiment.store.stop" modelEventHandlerName = "experiment.store.models" diff --git a/scheduler/pkg/store/experiment/store_test.go b/scheduler/pkg/store/experiment/store_test.go index 528fdea6ac..809641a224 100644 --- a/scheduler/pkg/store/experiment/store_test.go +++ b/scheduler/pkg/store/experiment/store_test.go @@ -399,7 +399,7 @@ func (f fakeModelStore) DrainServerReplica(serverName string, replicaIdx int) ([ panic("implement me") } -func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { panic("implement me") } diff --git a/scheduler/pkg/store/memory_status.go b/scheduler/pkg/store/memory_status.go index 89eceb4d2f..e262259a5a 100644 --- a/scheduler/pkg/store/memory_status.go +++ b/scheduler/pkg/store/memory_status.go @@ -110,7 +110,7 @@ func updateModelState(isLatest bool, modelVersion *ModelVersion, prevModelVersio } } -func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string) { +func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) { modelVersion.state = ModelStatus{ State: ScheduleFailed, Reason: reason, @@ -118,8 +118,10 @@ func (m *MemoryStore) FailedScheduling(modelVersion *ModelVersion, reason string AvailableReplicas: modelVersion.state.AvailableReplicas, UnavailableReplicas: modelVersion.GetModel().GetDeploymentSpec().GetReplicas() - modelVersion.state.AvailableReplicas, } - // make sure we reset server - modelVersion.server = "" + // make sure we reset server but only if there are no available replicas + if reset { + modelVersion.server = "" + } m.eventHub.PublishModelEvent( modelFailureEventSource, coordinator.ModelEventMsg{ diff --git a/scheduler/pkg/store/pipeline/status_test.go b/scheduler/pkg/store/pipeline/status_test.go index ea74c9e34f..f4dd15f06b 100644 --- a/scheduler/pkg/store/pipeline/status_test.go +++ b/scheduler/pkg/store/pipeline/status_test.go @@ -95,7 +95,7 @@ func (f fakeModelStore) RemoveServerReplica(serverName string, replicaIdx int) ( panic("implement me") } -func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string) { +func (f fakeModelStore) FailedScheduling(modelVersion *store.ModelVersion, reason string, reset bool) { panic("implement me") } diff --git a/scheduler/pkg/store/pipeline/store.go b/scheduler/pkg/store/pipeline/store.go index d5b59538aa..0bd9df9364 100644 --- a/scheduler/pkg/store/pipeline/store.go +++ b/scheduler/pkg/store/pipeline/store.go @@ -31,7 +31,7 @@ import ( ) const ( - pendingSyncsQueueSize int = 100 + pendingSyncsQueueSize int = 1000 addPipelineEventSource = "pipeline.store.addpipeline" removePipelineEventSource = "pipeline.store.removepipeline" setStatusPipelineEventSource = "pipeline.store.setstatus" diff --git a/scheduler/pkg/store/store.go b/scheduler/pkg/store/store.go index 2edfdcb7d4..a13dd8afa5 100644 --- a/scheduler/pkg/store/store.go +++ b/scheduler/pkg/store/store.go @@ -126,6 +126,6 @@ type ModelStore interface { ServerNotify(request *pb.ServerNotifyRequest) error RemoveServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models DrainServerReplica(serverName string, replicaIdx int) ([]string, error) // return previously loaded models - FailedScheduling(modelVersion *ModelVersion, reason string) + FailedScheduling(modelVersion *ModelVersion, reason string, reset bool) GetAllModels() []string }