Skip to content

Commit

Permalink
Fix experiment stop (#661)
Browse files Browse the repository at this point in the history
* allow stop experiments for pipelines

* always create an experiment endpoint

* tidy up modelUpdate logic
  • Loading branch information
sakoush committed Nov 29, 2022
1 parent f41be8e commit 71eb34c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
12 changes: 8 additions & 4 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,11 @@ func (p *IncrementalProcessor) experimentUpdate(exp *experiment.Experiment) erro
if exp.Default != nil {
switch exp.ResourceType {
case experiment.PipelineResourceType:
return p.addPipeline(*exp.Default)
logger.Infof("Experiment %s sync - calling for pipeline %s", exp.Name, *exp.Default)
err := p.addPipeline(*exp.Default)
if err != nil {
return err
}
case experiment.ModelResourceType:
logger.Infof("Experiment %s sync - calling for model %s", exp.Name, *exp.Default)
err := p.modelUpdate(*exp.Default)
Expand Down Expand Up @@ -552,19 +556,19 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {
logger.Debugf("sync: No model - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
return err
}

return p.updateEnvoy() // in practice we should not be here
}

latestModel := model.GetLatest()
if latestModel == nil {
logger.Debugf("sync: No latest model - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
return err
}
return p.updateEnvoy() // in practice we should not be here
}

if !model.CanReceiveTraffic() {
logger.Debugf("sync: Model can't receive traffic - removing for %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
Expand Down
28 changes: 21 additions & 7 deletions scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,45 +366,59 @@ func (es *ExperimentStore) startExperimentImpl(experiment *Experiment) (*coordin
}

func (es *ExperimentStore) StopExperiment(experimentName string) error {
expEvt, modelEvt, err := es.stopExperimentImpl(experimentName)
expEvt, modelEvt, pipelineEvt, err := es.stopExperimentImpl(experimentName)
if err != nil {
return err
}
if es.eventHub != nil {
if modelEvt != nil {
es.eventHub.PublishModelEvent(experimentStopEventSource, *modelEvt)
}
if pipelineEvt != nil {
es.eventHub.PublishPipelineEvent(experimentStartEventSource, *pipelineEvt)
}
if expEvt != nil {
es.eventHub.PublishExperimentEvent(experimentStopEventSource, *expEvt)
}
}
return nil
}

func (es *ExperimentStore) stopExperimentImpl(experimentName string) (*coordinator.ExperimentEventMsg, *coordinator.ModelEventMsg, error) {
func (es *ExperimentStore) stopExperimentImpl(experimentName string) (*coordinator.ExperimentEventMsg, *coordinator.ModelEventMsg, *coordinator.PipelineEventMsg, error) {
logger := es.logger.WithField("func", "StopExperiment")
logger.Infof("Stop %s", experimentName)
es.mu.Lock()
defer es.mu.Unlock()
if experiment, ok := es.experiments[experimentName]; ok {
var modelEvt *coordinator.ModelEventMsg
var pipelineEvt *coordinator.PipelineEventMsg
experiment.Deleted = true
experiment.Active = false
es.cleanExperimentState(experiment)
if experiment.Default != nil {
modelEvt = &coordinator.ModelEventMsg{
ModelName: *experiment.Default,
switch experiment.ResourceType {
case PipelineResourceType:
pipelineEvt = &coordinator.PipelineEventMsg{
PipelineName: *experiment.Default,
ExperimentUpdate: true,
}
case ModelResourceType:
modelEvt = &coordinator.ModelEventMsg{
ModelName: *experiment.Default,
}
default:
return nil, nil, nil, fmt.Errorf("Unknown resource type %v", experiment.ResourceType)
}
}
if es.db != nil {
err := es.db.delete(experiment)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
return es.createExperimentEventMsg(experiment, true), modelEvt, nil
return es.createExperimentEventMsg(experiment, true), modelEvt, pipelineEvt, nil
} else {
return nil, nil, &ExperimentNotFound{
return nil, nil, nil, &ExperimentNotFound{
experimentName: experimentName,
}
}
Expand Down

0 comments on commit 71eb34c

Please sign in to comment.