Skip to content

Commit

Permalink
Preserve ordering within sequences (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
Albin Severinson authored and Gary Conway committed Mar 25, 2022
1 parent 0ec0f8b commit 8de1a7a
Showing 1 changed file with 193 additions and 55 deletions.
248 changes: 193 additions & 55 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"reflect"
"time"

"github.com/apache/pulsar-client-go/pulsar"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (srv *SubmitFromLog) Run(ctx context.Context) {
lastLogged = time.Now()
}

// Check if the context has been cancelled, and, if not, get a message from Pulsar.
// Exit if the context has been cancelled. Otherwise, get a message from Pulsar.
select {
case <-ctx.Done():
log.WithFields(
Expand All @@ -89,14 +90,18 @@ func (srv *SubmitFromLog) Run(ctx context.Context) {
},
).Info("shutdown signal received")
break
default: // Get a message from Pulsar
default:

// Get a message from Pulsar, which consists of a sequence of events (i.e., state transitions).
ctxWithTimeout, _ := context.WithTimeout(ctx, time.Second)
msg, err := srv.Consumer.Receive(ctxWithTimeout)
if errors.Is(err, context.DeadlineExceeded) {
continue //expected
}

// If receiving fails, wait for a bit, since the problem may be transient.
if err != nil {
log.WithField("lastMessageId", lastMessageId).WithError(err).Warnf("Pulsar receive failed; ignoring")
log.WithField("lastMessageId", lastMessageId).WithError(err).Warnf("Pulsar receive failed; backing off")
time.Sleep(time.Second)
continue
}
Expand All @@ -113,34 +118,51 @@ func (srv *SubmitFromLog) Run(ctx context.Context) {
}
}

// TODO: Determine what to do based on the error (e.g., publish to a dead letter topic).
// Put the requestId into a message-specific context and logger,
// which are passed on to sub-functions.
messageCtx, ok := requestid.AddToIncomingContext(ctx, requestId)
if !ok {
messageCtx = ctx
}
messageLogger := log.WithFields(logrus.Fields{"messageId": msg.ID(), requestid.MetadataKey: requestId})
ctxWithLogger := ctxlogrus.ToContext(messageCtx, messageLogger)
err = srv.ProcessMessage(ctxWithLogger, msg)

// Unmarshal and validate the message.
sequence, err := srv.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
messageLogger.WithError(err).Warnf("processing message failed; ignoring")
time.Sleep(time.Second)
numErrored++
continue
}

// Process the events in the sequence.
// For efficiency, we may process several events at a time.
// To maintain ordering, we only do so for subsequences of consecutive events with equal type.
// TODO: Determine what to do based on the error (e.g., publish to a dead letter topic).
i := 0
for i < len(sequence.Events) {
j, err := srv.ProcessSubSequence(ctxWithLogger, i, sequence)
if err != nil {
messageLogger.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).WithError(err).Warnf("processing subsequence failed; ignoring")
numErrored++
}
if j == i {
j++ // Make sure we make progress in case of bugs in srv.ProcessSubSequence
}
i = j
}
}
}
}

// ProcessMessage processes a single Pulsar message (i.e., an event sequence)
// by decomposing the events in the sequence by type and calling into the embedded SubmitServer.
func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message) (err error) {
log := ctxlogrus.Extract(ctx)
log.Info("processing Pulsar message")

// UnmarshalEventSequence returns an EventSequence object contained in a byte buffer
// after validating that the resulting EventSequence is valid.
func (srv *SubmitFromLog) UnmarshalEventSequence(ctx context.Context, payload []byte) (*events.EventSequence, error) {
sequence := &events.EventSequence{}
err = proto.Unmarshal(msg.Payload(), sequence)
err := proto.Unmarshal(payload, sequence)
if err != nil {
err = errors.WithStack(err)
return
return nil, err
}

if sequence.JobSetName == "" {
Expand All @@ -150,74 +172,183 @@ func (srv *SubmitFromLog) ProcessMessage(ctx context.Context, msg pulsar.Message
Message: "JobSetName not provided",
}
err = errors.WithStack(err)
return
return nil, err
}

if sequence.UserId == "" {
err = &armadaerrors.ErrInvalidArgument{
Name: "UserId",
Value: "",
Message: "UserId not provided",
}
err = errors.WithStack(err)
return
return nil, err
}

if sequence.Queue == "" {
err = &armadaerrors.ErrInvalidArgument{
Name: "Queue",
Value: "",
Message: "Queue name not provided",
}
err = errors.WithStack(err)
return
return nil, err
}

if sequence.Groups == nil {
sequence.Groups = make([]string, 0)
}

// Split the event sequence by type
submitJobEvents := make([]*events.SubmitJob, 0)
cancelJobEvents := make([]*events.CancelJob, 0)
reprioritiseJobEvents := make([]*events.ReprioritiseJob, 0)
reprioritiseJobSetEvents := make([]*events.ReprioritiseJobSet, 0)
for i, event := range sequence.Events {
switch e := event.Event.(type) {
case *events.EventSequence_Event_SubmitJob:
submitJobEvents = append(submitJobEvents, e.SubmitJob)
case *events.EventSequence_Event_CancelJob:
cancelJobEvents = append(cancelJobEvents, e.CancelJob)
case *events.EventSequence_Event_ReprioritiseJob:
reprioritiseJobEvents = append(reprioritiseJobEvents, e.ReprioritiseJob)
case *events.EventSequence_Event_ReprioritiseJobSet:
reprioritiseJobSetEvents = append(reprioritiseJobSetEvents, e.ReprioritiseJobSet)
default:
log.WithFields(logrus.Fields{"index": i, "event": event}).Warn("received unsupported Pulsar event")
if sequence.Events == nil {
err = &armadaerrors.ErrInvalidArgument{
Name: "Events",
Value: nil,
Message: "no events in sequence",
}
err = errors.WithStack(err)
return nil, err
}
return sequence, nil
}

// Submit to Redis and Nats.
// Collect all errors as sibling errors.
var result *multierror.Error
result = multierror.Append(
result,
srv.SubmitJobs(ctx, sequence.UserId, sequence.Groups, sequence.Queue, sequence.JobSetName, submitJobEvents),
)
result = multierror.Append(
result,
srv.CancelJobs(ctx, sequence.UserId, cancelJobEvents),
)
result = multierror.Append(
result,
srv.ReprioritizeJobs(ctx, sequence.UserId, reprioritiseJobEvents),
)
result = multierror.Append(
result,
srv.ReprioritizeJobSets(ctx, sequence.UserId, sequence.Queue, sequence.JobSetName, reprioritiseJobSetEvents),
)

err = result.ErrorOrNil()
// ProcessSubSequence processes sequence.Events[i:j-1], where j is the index of the first event in the sequence
// of a type different from that of sequence.Events[i], or len(sequence.Events) if no such event exists in the sequence,
// and returns j.
//
// Processing one such subsequence at a time preserves ordering between events of different types.
// For example, SubmitJob events are processed before CancelJob events that occur later in the sequence.
//
// Events are processed by calling into the embedded srv.SubmitServer.
func (srv *SubmitFromLog) ProcessSubSequence(ctx context.Context, i int, sequence *events.EventSequence) (j int, err error) {
j = i + 1 // Return the next index to ensure that processing continues in case of unhandled errors at the caller
if i < 0 || i >= len(sequence.Events) {
err = &armadaerrors.ErrInvalidArgument{
Name: "i",
Value: i,
Message: fmt.Sprintf("tried to index into a list composed of %d elements", len(sequence.Events)),
}
err = errors.WithStack(err)
return
}

// Process the subsequence starting at the i-th event consisting of all consecutive events of the same type.
switch sequence.Events[i].Event.(type) {
case *events.EventSequence_Event_SubmitJob:
es := collectJobSubmitEvents(ctx, i, sequence)
err = srv.SubmitJobs(ctx, sequence.UserId, sequence.Groups, sequence.Queue, sequence.JobSetName, es)
err = errors.WithStack(err)
j = i + len(es)
case *events.EventSequence_Event_CancelJob:
es := collectCancelJobEvents(ctx, i, sequence)
err = srv.CancelJobs(ctx, sequence.UserId, es)
err = errors.WithStack(err)
j = i + len(es)
case *events.EventSequence_Event_CancelJobSet:
es := collectCancelJobSetEvents(ctx, i, sequence)
err = srv.CancelJobSets(ctx, sequence.Queue, sequence.JobSetName, es)
err = errors.WithStack(err)
j = i + len(es)
case *events.EventSequence_Event_ReprioritiseJob:
es := collectReprioritiseJobEvents(ctx, i, sequence)
err = srv.ReprioritizeJobs(ctx, sequence.UserId, es)
err = errors.WithStack(err)
j = i + len(es)
case *events.EventSequence_Event_ReprioritiseJobSet:
es := collectReprioritiseJobSetEvents(ctx, i, sequence)
err = srv.ReprioritizeJobSets(ctx, sequence.UserId, sequence.Queue, sequence.JobSetName, es)
err = errors.WithStack(err)
j = i + len(es)
// Events not handled by this processor. Since the legacy scheduler writes these transitions directly to the db.
//case *events.EventSequence_Event_JobSucceeded:
//case *events.EventSequence_Event_JobFailed:
//case *events.EventSequence_Event_JobRejected:
//case *events.EventSequence_Event_JobRunLeased:
//case *events.EventSequence_Event_JobRunAssigned:
//case *events.EventSequence_Event_JobRunRunning:
//case *events.EventSequence_Event_JobRunReturned:
//case *events.EventSequence_Event_JobRunSucceeded:
//case *events.EventSequence_Event_JobRunFailed:
default:
err = &armadaerrors.ErrInvalidArgument{
Name: fmt.Sprintf("Events[%d]", i),
Value: sequence.Events[i],
Message: "received unsupported Pulsar event",
}
err = errors.WithStack(err)

// Assign to j the index of the next event in the sequence with type different from sequence.Events[i],
// or len(sequence.Events) if no such element exists, so that processing won't be attempted for this type again.
j = i
t := reflect.TypeOf(sequence.Events[i].Event)
for j < len(sequence.Events) && reflect.TypeOf(sequence.Events[j].Event) == t {
j++
}
}
return
}

// collectJobSubmitEvents (and the corresponding functions for other types below)
// return a slice of events starting at index i in the sequence with equal type.
func collectJobSubmitEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.SubmitJob {
result := make([]*events.SubmitJob, 0)
for j := i; j < len(sequence.Events); j++ {
if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_SubmitJob); ok {
result = append(result, e.SubmitJob)
} else {
break
}
}
return result
}

func collectCancelJobEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.CancelJob {
result := make([]*events.CancelJob, 0)
for j := i; j < len(sequence.Events); j++ {
if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_CancelJob); ok {
result = append(result, e.CancelJob)
} else {
break
}
}
return result
}

func collectCancelJobSetEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.CancelJobSet {
result := make([]*events.CancelJobSet, 0)
for j := i; j < len(sequence.Events); j++ {
if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_CancelJobSet); ok {
result = append(result, e.CancelJobSet)
} else {
break
}
}
return result
}

func collectReprioritiseJobEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.ReprioritiseJob {
result := make([]*events.ReprioritiseJob, 0)
for j := i; j < len(sequence.Events); j++ {
if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_ReprioritiseJob); ok {
result = append(result, e.ReprioritiseJob)
} else {
break
}
}
return result
}

func collectReprioritiseJobSetEvents(ctx context.Context, i int, sequence *events.EventSequence) []*events.ReprioritiseJobSet {
result := make([]*events.ReprioritiseJobSet, 0)
for j := i; j < len(sequence.Events); j++ {
if e, ok := sequence.Events[j].Event.(*events.EventSequence_Event_ReprioritiseJobSet); ok {
result = append(result, e.ReprioritiseJobSet)
} else {
break
}
}
return result
}

func (srv *SubmitFromLog) SubmitJobs(ctx context.Context, userId string, groups []string, queueName string, jobSetName string, es []*events.SubmitJob) error {

jobs, err := apiJobsFromLogSubmitJobs(userId, groups, queueName, jobSetName, time.Now(), es)
Expand Down Expand Up @@ -372,6 +503,13 @@ func (srv *SubmitFromLog) CancelJobs(ctx context.Context, userId string, es []*e
return err
}

// CancelJobSets processes several CancelJobSet events.
// Because event sequences are specific to queue and job set, all CancelJobSet events in a sequence are equivalent,
// and we only need to call CancelJobSet once.
func (srv *SubmitFromLog) CancelJobSets(ctx context.Context, queueName string, jobSetName string, es []*events.CancelJobSet) error {
return srv.CancelJobSet(ctx, queueName, jobSetName)
}

func (srv *SubmitFromLog) CancelJobSet(ctx context.Context, queueName string, jobSetName string) error {

jobIds, err := srv.SubmitServer.jobRepository.GetActiveJobIds(queueName, jobSetName)
Expand Down

0 comments on commit 8de1a7a

Please sign in to comment.