Skip to content

Commit

Permalink
fix: Pipeline state during disconnects (#5298)
Browse files Browse the repository at this point in the history
* adjust logging

* set pipeline state to started if it already exists

* fix case condition on pipeline remove

* add logic to deal with terminating pipelines with no dataflow

* remove unused constant

* update case when we can remove  pipeline finaliser

* deal with pipeline that are terminating and dataflow restarts

* delete with scheduler restart while pipeline is terminating

* fix tests

* return all pipelines

* added description of pipeline states
  • Loading branch information
sakoush committed Feb 9, 2024
1 parent a0093db commit 94c107d
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 36 deletions.
2 changes: 1 addition & 1 deletion operator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ package constants
import "os"

const (
// note: we do not have a finalizer for servers as we rely on the draining logic to reschedule models
ModelFinalizerName = "seldon.model.finalizer"
ServerFinalizerName = "seldon.server.finalizer"
PipelineFinalizerName = "seldon.pipeline.finalizer"
ExperimentFinalizerName = "seldon.experiment.finalizer"
RuntimeFinalizerName = "seldon.runtime.finalizer"
Expand Down
9 changes: 7 additions & 2 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, conn *grp
}

if !pipeline.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Pipeline is pending deletion", "pipeline", pipeline.Name)
logger.Info(
"Pipeline is pending deletion",
"pipeline", pipeline.Name,
"state", pv.State.Status.String(),
)
if canRemovePipelineFinalizer(pv.State.Status) {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestPipeline := &v1alpha1.Pipeline{}
Expand Down Expand Up @@ -246,7 +250,8 @@ func (s *SchedulerClient) updatePipelineStatusImpl(pipeline *v1alpha1.Pipeline)

func canRemovePipelineFinalizer(state scheduler.PipelineVersionState_PipelineStatus) bool {
switch state {
case scheduler.PipelineVersionState_PipelineTerminating:
// we should wait if the state is not terminal for deleting the finalizer, it should be Terminated in the case of delete
case scheduler.PipelineVersionState_PipelineTerminating, scheduler.PipelineVersionState_PipelineTerminate:
return false
default:
return true
Expand Down
1 change: 1 addition & 0 deletions operator/scheduler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, conn *grpc.
}
// Handle status update
// This is key for finalizer to remove server when loaded models is zero
// note: now we are not using finalizers for servers
server.Status.LoadedModelReplicas = event.NumLoadedModelReplicas
return s.updateServerStatus(server)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class PipelineSubscriber(
private val pipelines = ConcurrentHashMap<PipelineId, Pipeline>()

suspend fun subscribe() {
logger.info("will connect to ${upstreamHost}:${upstreamPort}")
while (true) {
logger.info("will connect to ${upstreamHost}:${upstreamPort}")
retry(binaryExponentialBackoff(50..5_000L)) {
logger.debug("retrying to connect to ${upstreamHost}:${upstreamPort}")
subscribePipelines(kafkaConsumerGroupIdPrefix, namespace)
}
}
Expand Down Expand Up @@ -136,7 +137,8 @@ class PipelineSubscriber(
updateEventReason = "kafka topic error"
}
} else {
logger.warn("pipeline ${metadata.id} already exists")
pipelineStarted = true
logger.warn("pipeline ${metadata.name} with id ${metadata.id} already exists")
}

client.pipelineUpdateEvent(
Expand Down
22 changes: 18 additions & 4 deletions scheduler/pkg/kafka/dataflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,28 @@ func contains(slice []string, val string) bool {

func (c *ChainerServer) rebalance() {
logger := c.logger.WithField("func", "rebalance")
// note that we are not retrying PipelineFailed pipelines, consider adding this
evts := c.pipelineHandler.GetAllRunningPipelineVersions()
for _, event := range evts {
pv, err := c.pipelineHandler.GetPipelineVersion(event.PipelineName, event.PipelineVersion, event.UID)
if err != nil {
logger.WithError(err).Errorf("Failed to get pipeline from event %s", event.String())
continue
}
c.logger.Debugf("Rebalancing pipeline %s:%d with state %s", event.PipelineName, event.PipelineVersion, pv.State.Status.String())
c.mu.Lock()
if len(c.streams) == 0 {
pipelineState := pipeline.PipelineCreate
// if no dataflow engines available then we think we can terminate pipelines.
if pv.State.Status == pipeline.PipelineTerminating {
pipelineState = pipeline.PipelineTerminated
}
c.logger.Debugf("No dataflow engines available to handle pipeline %s, setting state to %s", pv.String(), pipelineState.String())
if err := c.pipelineHandler.SetPipelineState(
pv.Name,
pv.Version,
pv.UID,
pipeline.PipelineCreate,
pipelineState,
"no dataflow engines available to handle pipeline",
sourceChainerServer,
); err != nil {
Expand Down Expand Up @@ -393,12 +401,18 @@ func (c *ChainerServer) handlePipelineEvent(event coordinator.PipelineEventMsg)
errMsg := "no dataflow engines available to handle pipeline"
logger.WithField("pipeline", event.PipelineName).Warn(errMsg)

err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, pv.State.Status, errMsg, sourceChainerServer)
status := pv.State.Status
// if no dataflow engines available then we think we can terminate pipelines.
// TODO: however it might be a networking glitch and we need to handle this better in future
if pv.State.Status == pipeline.PipelineTerminating || pv.State.Status == pipeline.PipelineTerminate {
status = pipeline.PipelineTerminated
}
err := c.pipelineHandler.SetPipelineState(pv.Name, pv.Version, pv.UID, status, errMsg, sourceChainerServer)
if err != nil {
logger.
WithError(err).
WithField("pipeline", pv.String()).
WithField("status", pv.State.Status).
WithField("status", status).
Error("failed to set pipeline state")
}

Expand All @@ -420,7 +434,7 @@ func (c *ChainerServer) handlePipelineEvent(event coordinator.PipelineEventMsg)
if err != nil {
logger.WithError(err).Errorf("Failed to set pipeline state to terminating for %s", pv.String())
}
msg := c.createPipelineMessage(pv)
msg := c.createPipelineMessage(pv) // note pv is a copy and does not include the new change to terminating state
c.sendPipelineMsgToSelectedServers(msg, pv)
}
}()
Expand Down
1 change: 1 addition & 0 deletions scheduler/pkg/server/pipeline_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (s *SchedulerServer) sendCurrentPipelineStatuses(stream pb.Scheduler_Subscr
}
for _, p := range pipelines {
resp := createPipelineStatus(p, allVersions)
s.logger.Debugf("Sending pipeline status %s", resp.String())
err = stream.Send(resp)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
Expand Down
3 changes: 3 additions & 0 deletions scheduler/pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package server

import (
"context"
"fmt"
"testing"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -529,6 +530,8 @@ func TestUnloadPipeline(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
_ = test.server.pipelineHandler.(*pipeline.PipelineStore).InitialiseOrRestoreDB(path)
if test.loadReq != nil {
err := test.server.pipelineHandler.AddPipeline(test.loadReq.Pipeline)
g.Expect(err).To(BeNil())
Expand Down
13 changes: 7 additions & 6 deletions scheduler/pkg/store/pipeline/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ func (pdb *PipelineDBManager) save(pipeline *Pipeline) error {
})
}

func (pdb *PipelineDBManager) delete(pipeline *Pipeline) error {
return pdb.db.Update(func(txn *badger.Txn) error {
err := txn.Delete([]byte(pipeline.Name))
return err
})
}
// TODO: delete unused pipelines from the store as for now it increases indefinitely
// func (pdb *PipelineDBManager) delete(pipeline *Pipeline) error {
// return pdb.db.Update(func(txn *badger.Txn) error {
// err := txn.Delete([]byte(pipeline.Name))
// return err
// })
// }

func (pdb *PipelineDBManager) restore(createPipelineCb func(pipeline *Pipeline)) error {
return pdb.db.View(func(txn *badger.Txn) error {
Expand Down
14 changes: 7 additions & 7 deletions scheduler/pkg/store/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ type PipelineStatus uint32

const (
PipelineStatusUnknown PipelineStatus = iota
PipelineCreate
PipelineCreating
PipelineReady
PipelineFailed
PipelineTerminate
PipelineTerminating
PipelineTerminated
PipelineCreate // Received signal to create pipeline.
PipelineCreating // In the process of creating pipeline.
PipelineReady // Pipeline is ready to be used.
PipelineFailed // Pipeline creation/deletion failed.
PipelineTerminate // Received signal that pipeline should be terminated.
PipelineTerminating // In the process of doing cleanup/housekeeping for pipeline termination.
PipelineTerminated // Pipeline has been terminated.
)

type PipelineState struct {
Expand Down
25 changes: 11 additions & 14 deletions scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,17 @@ func (ps *PipelineStore) removePipelineImpl(name string) (*coordinator.PipelineE
}
lastState := lastPipelineVersion.State
switch lastState.Status {
case PipelineTerminate:
case PipelineTerminating:
return nil, &PipelineTerminatingErr{pipeline: name}
case PipelineTerminated:
return nil, &PipelineAlreadyTerminatedErr{pipeline: name}
default:
if ps.db != nil {
err := ps.db.delete(pipeline)
if err != nil {
return nil, err
}
}
pipeline.Deleted = true
lastPipelineVersion.State.setState(PipelineTerminate, "pipeline removed")
if err := ps.db.save(pipeline); err != nil {
ps.logger.WithError(err).Errorf("Failed to save pipeline %s", name)
return nil, err
}
return &coordinator.PipelineEventMsg{
PipelineName: lastPipelineVersion.Name,
PipelineVersion: lastPipelineVersion.Version,
Expand Down Expand Up @@ -293,7 +291,8 @@ func (ps *PipelineStore) GetAllRunningPipelineVersions() []coordinator.PipelineE
for _, p := range ps.pipelines {
pv := p.GetLatestPipelineVersion()
switch pv.State.Status {
case PipelineCreate, PipelineCreating, PipelineReady:
// we consider PipelineTerminating as running as it is still active
case PipelineCreate, PipelineCreating, PipelineReady, PipelineTerminating:
events = append(events, coordinator.PipelineEventMsg{
PipelineName: pv.Name,
PipelineVersion: pv.Version,
Expand All @@ -310,13 +309,11 @@ func (ps *PipelineStore) GetPipelines() ([]*Pipeline, error) {

foundPipelines := []*Pipeline{}
for _, p := range ps.pipelines {
if !p.Deleted {
copied, err := copystructure.Copy(p)
if err != nil {
return nil, err
}
foundPipelines = append(foundPipelines, copied.(*Pipeline))
copied, err := copystructure.Copy(p)
if err != nil {
return nil, err
}
foundPipelines = append(foundPipelines, copied.(*Pipeline))
}

return foundPipelines, nil
Expand Down
21 changes: 21 additions & 0 deletions scheduler/pkg/store/pipeline/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t
package pipeline

import (
"fmt"
"testing"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -289,6 +290,7 @@ func TestRemovePipeline(t *testing.T) {
},
},
},
err: &PipelineTerminatingErr{pipeline: "pipeline"},
},
{
name: "pipeline terminated err",
Expand Down Expand Up @@ -368,16 +370,35 @@ func TestRemovePipeline(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := logrus.New()
path := fmt.Sprintf("%s/db", t.TempDir())
db, _ := newPipelineDbManager(getPipelineDbFolder(path), logger)
test.store.db = db
err := test.store.RemovePipeline(test.pipelineName)
if test.err == nil {
p := test.store.pipelines[test.pipelineName]
g.Expect(p).ToNot(BeNil())
pv := p.GetLatestPipelineVersion()
g.Expect(pv).ToNot(BeNil())
g.Expect(pv.State.Status).To(Equal(PipelineTerminate))

// check db contains pipeline
pipelines := map[string]*Pipeline{}
restoreCb := func(pipeline *Pipeline) {
pipelines[pipeline.Name] = pipeline
}
_ = test.store.db.restore(restoreCb)
actualPipeline := pipelines[test.pipelineName]
expectedPipeline := test.store.pipelines[test.pipelineName]
// TODO: check all fields
g.Expect(actualPipeline.Deleted).To(Equal(expectedPipeline.Deleted))
g.Expect(actualPipeline.LastVersion).To(Equal(expectedPipeline.LastVersion))
g.Expect(len(actualPipeline.Versions)).To(Equal(len(expectedPipeline.Versions)))
g.Expect(len(actualPipeline.Versions)).To(Equal(len(expectedPipeline.Versions)))
} else {
g.Expect(err.Error()).To(Equal(test.err.Error()))
}
_ = test.store.db.Stop()
})
}
}
Expand Down

0 comments on commit 94c107d

Please sign in to comment.