From 9dfb486bf51263085a8b295c445b2a453fb6c7d2 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Thu, 21 Mar 2024 14:44:45 +0000 Subject: [PATCH] do not mark source in terminate events of old pipelines (#5469) --- scheduler/pkg/store/pipeline/store.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/scheduler/pkg/store/pipeline/store.go b/scheduler/pkg/store/pipeline/store.go index d9bc86102d..74c4128319 100644 --- a/scheduler/pkg/store/pipeline/store.go +++ b/scheduler/pkg/store/pipeline/store.go @@ -342,20 +342,19 @@ func (ps *PipelineStore) terminateOldUnterminatedPipelinesIfNeeded(pipeline *Pip func (ps *PipelineStore) SetPipelineState(name string, versionNumber uint32, uid string, status PipelineStatus, reason string, source string) error { logger := ps.logger.WithField("func", "SetPipelineState") logger.Debugf("Attempt to set state on pipeline %s:%d status:%s", name, versionNumber, status.String()) - evts, err := ps.setPipelineStateImpl(name, versionNumber, uid, status, reason) + evts, err := ps.setPipelineStateImpl(name, versionNumber, uid, status, reason, source) if err != nil { return err } if ps.eventHub != nil { for _, evt := range evts { - evt.Source = source ps.eventHub.PublishPipelineEvent(setStatusPipelineEventSource, *evt) } } return nil } -func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32, uid string, status PipelineStatus, reason string) ([]*coordinator.PipelineEventMsg, error) { +func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32, uid string, status PipelineStatus, reason, source string) ([]*coordinator.PipelineEventMsg, error) { var evts []*coordinator.PipelineEventMsg ps.mu.Lock() defer ps.mu.Unlock() @@ -367,8 +366,12 @@ func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32, PipelineName: pipelineVersion.Name, PipelineVersion: pipelineVersion.Version, UID: pipelineVersion.UID, + Source: source, }) if status == PipelineReady { + // note that we are not setting the source for these events as we do not want to discard them in the chainer service + // i.e in handlePipelineEvent we want to process the termination events even though they are triggered by the chainer + // to set the status of the new version of the pipeline to ready evts = append(evts, ps.terminateOldUnterminatedPipelinesIfNeeded(pipeline)...) } if !pipeline.Deleted && ps.db != nil {