diff --git a/internal/armada/server/submit_from_log.go b/internal/armada/server/submit_from_log.go index 846e4ef7339..804dfde6c0c 100644 --- a/internal/armada/server/submit_from_log.go +++ b/internal/armada/server/submit_from_log.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "reflect" "time" "github.com/apache/pulsar-client-go/pulsar" @@ -75,7 +76,7 @@ func (srv *SubmitFromLog) Run(ctx context.Context) { lastLogged = time.Now() } - // Check if the context has been cancelled, and, if not, get a message from Pulsar. + // Exit if the context has been cancelled. Otherwise, get a message from Pulsar. select { case <-ctx.Done(): log.WithFields( @@ -89,14 +90,18 @@ func (srv *SubmitFromLog) Run(ctx context.Context) { }, ).Info("shutdown signal received") break - default: // Get a message from Pulsar + default: + + // Get a message from Pulsar, which consists of a sequence of events (i.e., state transitions). ctxWithTimeout, _ := context.WithTimeout(ctx, time.Second) msg, err := srv.Consumer.Receive(ctxWithTimeout) if errors.Is(err, context.DeadlineExceeded) { continue //expected } + + // If receiving fails, wait for a bit, since the problem may be transient. if err != nil { - log.WithField("lastMessageId", lastMessageId).WithError(err).Warnf("Pulsar receive failed; ignoring") + log.WithField("lastMessageId", lastMessageId).WithError(err).Warnf("Pulsar receive failed; backing off") time.Sleep(time.Second) continue } @@ -113,34 +118,51 @@ func (srv *SubmitFromLog) Run(ctx context.Context) { } } - // TODO: Determine what to do based on the error (e.g., publish to a dead letter topic). + // Put the requestId into a message-specific context and logger, + // which are passed on to sub-functions. messageCtx, ok := requestid.AddToIncomingContext(ctx, requestId) if !ok { messageCtx = ctx } messageLogger := log.WithFields(logrus.Fields{"messageId": msg.ID(), requestid.MetadataKey: requestId}) ctxWithLogger := ctxlogrus.ToContext(messageCtx, messageLogger) - err = srv.ProcessMessage(ctxWithLogger, msg) + + // Unmarshal and validate the message. + sequence, err := srv.UnmarshalEventSequence(ctxWithLogger, msg.Payload()) if err != nil { messageLogger.WithError(err).Warnf("processing message failed; ignoring") - time.Sleep(time.Second) numErrored++ + continue + } + + // Process the events in the sequence. + // For efficiency, we may process several events at a time. + // To maintain ordering, we only do so for subsequences of consecutive events with equal type. + // TODO: Determine what to do based on the error (e.g., publish to a dead letter topic). + i := 0 + for i < len(sequence.Events) { + j, err := srv.ProcessSubSequence(ctxWithLogger, i, sequence) + if err != nil { + messageLogger.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).WithError(err).Warnf("processing subsequence failed; ignoring") + numErrored++ + } + if j == i { + j++ // Make sure we make progress in case of bugs in srv.ProcessSubSequence + } + i = j } } } } -// ProcessMessage processes a single Pulsar message (i.e., an event sequence) -// by decomposing the events in the sequence by type and calling into the embedded SubmitServer. -func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message) (err error) { - log := ctxlogrus.Extract(ctx) - log.Info("processing Pulsar message") - +// UnmarshalEventSequence returns an EventSequence object contained in a byte buffer +// after validating that the resulting EventSequence is valid. +func (srv *SubmitFromLog) UnmarshalEventSequence(ctx context.Context, payload []byte) (*events.EventSequence, error) { sequence := &events.EventSequence{} - err = proto.Unmarshal(msg.Payload(), sequence) + err := proto.Unmarshal(payload, sequence) if err != nil { err = errors.WithStack(err) - return + return nil, err } if sequence.JobSetName == "" { @@ -150,8 +172,9 @@ func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message Message: "JobSetName not provided", } err = errors.WithStack(err) - return + return nil, err } + if sequence.UserId == "" { err = &armadaerrors.ErrInvalidArgument{ Name: "UserId", @@ -159,8 +182,9 @@ func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message Message: "UserId not provided", } err = errors.WithStack(err) - return + return nil, err } + if sequence.Queue == "" { err = &armadaerrors.ErrInvalidArgument{ Name: "Queue", @@ -168,56 +192,163 @@ func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message Message: "Queue name not provided", } err = errors.WithStack(err) - return + return nil, err } + if sequence.Groups == nil { sequence.Groups = make([]string, 0) } - // Split the event sequence by type - submitJobEvents := make([]*events.SubmitJob, 0) - cancelJobEvents := make([]*events.CancelJob, 0) - reprioritiseJobEvents := make([]*events.ReprioritiseJob, 0) - reprioritiseJobSetEvents := make([]*events.ReprioritiseJobSet, 0) - for i, event := range sequence.Events { - switch e := event.Event.(type) { - case *events.EventSequence_Event_SubmitJob: - submitJobEvents = append(submitJobEvents, e.SubmitJob) - case *events.EventSequence_Event_CancelJob: - cancelJobEvents = append(cancelJobEvents, e.CancelJob) - case *events.EventSequence_Event_ReprioritiseJob: - reprioritiseJobEvents = append(reprioritiseJobEvents, e.ReprioritiseJob) - case *events.EventSequence_Event_ReprioritiseJobSet: - reprioritiseJobSetEvents = append(reprioritiseJobSetEvents, e.ReprioritiseJobSet) - default: - log.WithFields(logrus.Fields{"index": i, "event": event}).Warn("received unsupported Pulsar event") + if sequence.Events == nil { + err = &armadaerrors.ErrInvalidArgument{ + Name: "Events", + Value: nil, + Message: "no events in sequence", } + err = errors.WithStack(err) + return nil, err } + return sequence, nil +} - // Submit to Redis and Nats. - // Collect all errors as sibling errors. - var result *multierror.Error - result = multierror.Append( - result, - srv.SubmitJobs(ctx, sequence.UserId, sequence.Groups, sequence.Queue, sequence.JobSetName, submitJobEvents), - ) - result = multierror.Append( - result, - srv.CancelJobs(ctx, sequence.UserId, cancelJobEvents), - ) - result = multierror.Append( - result, - srv.ReprioritizeJobs(ctx, sequence.UserId, reprioritiseJobEvents), - ) - result = multierror.Append( - result, - srv.ReprioritizeJobSets(ctx, sequence.UserId, sequence.Queue, sequence.JobSetName, reprioritiseJobSetEvents), - ) - - err = result.ErrorOrNil() +// ProcessSubSequence processes sequence.Events[i:j-1], where j is the index of the first event in the sequence +// of a type different from that of sequence.Events[i], or len(sequence.Events) if no such event exists in the sequence, +// and returns j. +// +// Processing one such subsequence at a time preserves ordering between events of different types. +// For example, SubmitJob events are processed before CancelJob events that occur later in the sequence. +// +// Events are processed by calling into the embedded srv.SubmitServer. +func (srv *SubmitFromLog) ProcessSubSequence(ctx context.Context, i int, sequence *events.EventSequence) (j int, err error) { + j = i + 1 // Return the next index to ensure that processing continues in case of unhandled errors at the caller + if i < 0 || i >= len(sequence.Events) { + err = &armadaerrors.ErrInvalidArgument{ + Name: "i", + Value: i, + Message: fmt.Sprintf("tried to index into a list composed of %d elements", len(sequence.Events)), + } + err = errors.WithStack(err) + return + } + + // Process the subsequence starting at the i-th event consisting of all consecutive events of the same type. + switch sequence.Events[i].Event.(type) { + case *events.EventSequence_Event_SubmitJob: + es := collectJobSubmitEvents(ctx, i, sequence) + err = srv.SubmitJobs(ctx, sequence.UserId, sequence.Groups, sequence.Queue, sequence.JobSetName, es) + err = errors.WithStack(err) + j = i + len(es) + case *events.EventSequence_Event_CancelJob: + es := collectCancelJobEvents(ctx, i, sequence) + err = srv.CancelJobs(ctx, sequence.UserId, es) + err = errors.WithStack(err) + j = i + len(es) + case *events.EventSequence_Event_CancelJobSet: + es := collectCancelJobSetEvents(ctx, i, sequence) + err = srv.CancelJobSets(ctx, sequence.Queue, sequence.JobSetName, es) + err = errors.WithStack(err) + j = i + len(es) + case *events.EventSequence_Event_ReprioritiseJob: + es := collectReprioritiseJobEvents(ctx, i, sequence) + err = srv.ReprioritizeJobs(ctx, sequence.UserId, es) + err = errors.WithStack(err) + j = i + len(es) + case *events.EventSequence_Event_ReprioritiseJobSet: + es := collectReprioritiseJobSetEvents(ctx, i, sequence) + err = srv.ReprioritizeJobSets(ctx, sequence.UserId, sequence.Queue, sequence.JobSetName, es) + err = errors.WithStack(err) + j = i + len(es) + // Events not handled by this processor. Since the legacy scheduler writes these transitions directly to the db. + //case *events.EventSequence_Event_JobSucceeded: + //case *events.EventSequence_Event_JobFailed: + //case *events.EventSequence_Event_JobRejected: + //case *events.EventSequence_Event_JobRunLeased: + //case *events.EventSequence_Event_JobRunAssigned: + //case *events.EventSequence_Event_JobRunRunning: + //case *events.EventSequence_Event_JobRunReturned: + //case *events.EventSequence_Event_JobRunSucceeded: + //case *events.EventSequence_Event_JobRunFailed: + default: + err = &armadaerrors.ErrInvalidArgument{ + Name: fmt.Sprintf("Events[%d]", i), + Value: sequence.Events[i], + Message: "received unsupported Pulsar event", + } + err = errors.WithStack(err) + + // Assign to j the index of the next event in the sequence with type different from sequence.Events[i], + // or len(sequence.Events) if no such element exists, so that processing won't be attempted for this type again. + j = i + t := reflect.TypeOf(sequence.Events[i].Event) + for j < len(sequence.Events) && reflect.TypeOf(sequence.Events[j].Event) == t { + j++ + } + } return } +// collectJobSubmitEvents (and the corresponding functions for other types below) +// return a slice of events starting at index i in the sequence with equal type. +func collectJobSubmitEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.SubmitJob { + result := make([]*events.SubmitJob, 0) + for j := i; j < len(sequence.Events); j++ { + if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_SubmitJob); ok { + result = append(result, e.SubmitJob) + } else { + break + } + } + return result +} + +func collectCancelJobEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.CancelJob { + result := make([]*events.CancelJob, 0) + for j := i; j < len(sequence.Events); j++ { + if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_CancelJob); ok { + result = append(result, e.CancelJob) + } else { + break + } + } + return result +} + +func collectCancelJobSetEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.CancelJobSet { + result := make([]*events.CancelJobSet, 0) + for j := i; j < len(sequence.Events); j++ { + if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_CancelJobSet); ok { + result = append(result, e.CancelJobSet) + } else { + break + } + } + return result +} + +func collectReprioritiseJobEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.ReprioritiseJob { + result := make([]*events.ReprioritiseJob, 0) + for j := i; j < len(sequence.Events); j++ { + if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_ReprioritiseJob); ok { + result = append(result, e.ReprioritiseJob) + } else { + break + } + } + return result +} + +func collectReprioritiseJobSetEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.ReprioritiseJobSet { + result := make([]*events.ReprioritiseJobSet, 0) + for j := i; j < len(sequence.Events); j++ { + if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_ReprioritiseJobSet); ok { + result = append(result, e.ReprioritiseJobSet) + } else { + break + } + } + return result +} + func (srv *SubmitFromLog) SubmitJobs(ctx context.Context, userId string, groups []string, queueName string, jobSetName string, es []*events.SubmitJob) error { jobs, err := apiJobsFromLogSubmitJobs(userId, groups, queueName, jobSetName, time.Now(), es) @@ -372,6 +503,13 @@ func (srv *SubmitFromLog) CancelJobs(ctx context.Context, userId string, es []*e return err } +// CancelJobSets processes several CancelJobSet events. +// Because event sequences are specific to queue and job set, all CancelJobSet events in a sequence are equivalent, +// and we only need to call CancelJobSet once. +func (srv *SubmitFromLog) CancelJobSets(ctx context.Context, queueName string, jobSetName string, es []*events.CancelJobSet) error { + return srv.CancelJobSet(ctx, queueName, jobSetName) +} + func (srv *SubmitFromLog) CancelJobSet(ctx context.Context, queueName string, jobSetName string) error { jobIds, err := srv.SubmitServer.jobRepository.GetActiveJobIds(queueName, jobSetName)