Skip to content

Commit

Permalink
do not mark source in terminate events of old pipelines (#5469)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Mar 21, 2024
1 parent a9ccedd commit 9dfb486
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,19 @@ func (ps *PipelineStore) terminateOldUnterminatedPipelinesIfNeeded(pipeline *Pip
func (ps *PipelineStore) SetPipelineState(name string, versionNumber uint32, uid string, status PipelineStatus, reason string, source string) error {
logger := ps.logger.WithField("func", "SetPipelineState")
logger.Debugf("Attempt to set state on pipeline %s:%d status:%s", name, versionNumber, status.String())
evts, err := ps.setPipelineStateImpl(name, versionNumber, uid, status, reason)
evts, err := ps.setPipelineStateImpl(name, versionNumber, uid, status, reason, source)
if err != nil {
return err
}
if ps.eventHub != nil {
for _, evt := range evts {
evt.Source = source
ps.eventHub.PublishPipelineEvent(setStatusPipelineEventSource, *evt)
}
}
return nil
}

func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32, uid string, status PipelineStatus, reason string) ([]*coordinator.PipelineEventMsg, error) {
func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32, uid string, status PipelineStatus, reason, source string) ([]*coordinator.PipelineEventMsg, error) {
var evts []*coordinator.PipelineEventMsg
ps.mu.Lock()
defer ps.mu.Unlock()
Expand All @@ -367,8 +366,12 @@ func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32,
PipelineName: pipelineVersion.Name,
PipelineVersion: pipelineVersion.Version,
UID: pipelineVersion.UID,
Source: source,
})
if status == PipelineReady {
// note that we are not setting the source for these events as we do not want to discard them in the chainer service
// i.e in handlePipelineEvent we want to process the termination events even though they are triggered by the chainer
// to set the status of the new version of the pipeline to ready
evts = append(evts, ps.terminateOldUnterminatedPipelinesIfNeeded(pipeline)...)
}
if !pipeline.Deleted && ps.db != nil {
Expand Down

0 comments on commit 9dfb486

Please sign in to comment.