Skip to content

Commit

Permalink
Revert "Limit Pulsar messages to have a configurable max number of ev…
Browse files Browse the repository at this point in the history
…ents per message"

This reverts commit 11a8a2a.
  • Loading branch information
JamesMurkin committed Jun 19, 2024
1 parent 11a8a2a commit bc755c3
Show file tree
Hide file tree
Showing 10 changed files with 5 additions and 49 deletions.
1 change: 0 additions & 1 deletion config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ pulsar:
compressionLevel: faster
eventsPrinter: false
eventsPrinterSubscription: "EventsPrinter"
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 # 4MB
receiverQueueSize: 100
postgres:
Expand Down
1 change: 0 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pulsar:
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 #4Mi
armadaApi:
armadaUrl: "server:50051"
Expand Down
2 changes: 0 additions & 2 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type PulsarConfig struct {
CompressionLevel pulsar.CompressionLevel
// Settings for deduplication, which relies on a postgres server.
DedupTable string
// Maximum allowed Events per message
MaxAllowedEventsPerMessage int `validate:"gte=0"`
// Maximum allowed message size in bytes
MaxAllowedMessageSize uint
// Timeout when polling pulsar for messages
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
CompressionLevel: config.Pulsar.CompressionLevel,
BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize,
Topic: config.Pulsar.JobsetEventsTopic,
}, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize)
}, config.Pulsar.MaxAllowedMessageSize)
if err != nil {
return errors.Wrapf(err, "error creating pulsar producer")
}
Expand Down
24 changes: 0 additions & 24 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
)
Expand Down Expand Up @@ -254,29 +253,6 @@ func groupsEqual(g1, g2 []string) bool {
return true
}

func LimitSequencesEventMessageCount(sequences []*armadaevents.EventSequence, maxEventsPerSequence int) []*armadaevents.EventSequence {
rv := make([]*armadaevents.EventSequence, 0, len(sequences))
for _, sequence := range sequences {
if len(sequence.Events) > maxEventsPerSequence {
splitEventMessages := slices.PartitionToMaxLen(sequence.Events, maxEventsPerSequence)

for _, eventMessages := range splitEventMessages {
rv = append(rv, &armadaevents.EventSequence{
Queue: sequence.Queue,
JobSetName: sequence.JobSetName,
UserId: sequence.UserId,
Groups: sequence.Groups,
Events: eventMessages,
})
}

} else {
rv = append(rv, sequence)
}
}
return rv
}

// LimitSequencesByteSize calls LimitSequenceByteSize for each of the provided sequences
// and returns all resulting sequences.
func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
Expand Down
6 changes: 2 additions & 4 deletions internal/common/pulsarutils/eventsequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ import (

// CompactAndPublishSequences reduces the number of sequences to the smallest possible,
// while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.
func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxEventsPerMessage int, maxMessageSizeInBytes uint) error {
func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes uint) error {
// Reduce the number of sequences to send to the minimum possible,
// and then break up any sequences larger than maxMessageSizeInBytes.
sequences = eventutil.CompactEventSequences(sequences)
// Limit each sequence to have no more than maxEventsPerSequence events per sequence
sequences = eventutil.LimitSequencesEventMessageCount(sequences, maxEventsPerMessage)
// Limit each sequence to be no larger than maxMessageSizeInBytes bytes
sequences, err := eventutil.LimitSequencesByteSize(sequences, maxMessageSizeInBytes, true)
if err != nil {
return err
Expand Down
5 changes: 0 additions & 5 deletions internal/common/pulsarutils/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ type Publisher interface {
type PulsarPublisher struct {
// Used to send messages to pulsar
producer pulsar.Producer
// Maximum number of Events in each EventSequence
maxEventsPerMessage int
// Maximum size (in bytes) of produced pulsar messages.
// This must be below 4MB which is the pulsar message size limit
maxAllowedMessageSize uint
Expand All @@ -28,7 +26,6 @@ type PulsarPublisher struct {
func NewPulsarPublisher(
pulsarClient pulsar.Client,
producerOptions pulsar.ProducerOptions,
maxEventsPerMessage int,
maxAllowedMessageSize uint,
) (*PulsarPublisher, error) {
producer, err := pulsarClient.CreateProducer(producerOptions)
Expand All @@ -37,7 +34,6 @@ func NewPulsarPublisher(
}
return &PulsarPublisher{
producer: producer,
maxEventsPerMessage: maxEventsPerMessage,
maxAllowedMessageSize: maxAllowedMessageSize,
}, nil
}
Expand All @@ -49,7 +45,6 @@ func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, es *armada
ctx,
[]*armadaevents.EventSequence{es},
p.producer,
p.maxEventsPerMessage,
p.maxAllowedMessageSize)
}

Expand Down
5 changes: 1 addition & 4 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type ExecutorApi struct {
allowedPriorities []int32
// Known priority classes
priorityClasses map[string]priorityTypes.PriorityClass
// Max number of events in published Pulsar messages
maxEventsPerPulsarMessage int
// Max size of Pulsar messages produced.
maxPulsarMessageSizeBytes uint
// See scheduling schedulingConfig.
Expand All @@ -58,7 +56,6 @@ func NewExecutorApi(producer pulsar.Producer,
nodeIdLabel string,
priorityClassNameOverride *string,
priorityClasses map[string]priorityTypes.PriorityClass,
maxEventsPerPulsarMessage int,
maxPulsarMessageSizeBytes uint,
) (*ExecutorApi, error) {
if len(allowedPriorities) == 0 {
Expand Down Expand Up @@ -313,7 +310,7 @@ func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string)
// ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing.
func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)
err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxEventsPerPulsarMessage, srv.maxPulsarMessageSizeBytes)
err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxPulsarMessageSizeBytes)
return &types.Empty{}, err
}

Expand Down
5 changes: 0 additions & 5 deletions internal/scheduler/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ type PulsarPublisher struct {
numPartitions int
// Timeout after which async messages sends will be considered failed
pulsarSendTimeout time.Duration
// Maximum number of Events in each EventSequence
maxEventsPerMessage int
// Maximum size (in bytes) of produced pulsar messages.
// This must be below 4MB which is the pulsar message size limit
maxMessageBatchSize uint
Expand All @@ -53,7 +51,6 @@ type PulsarPublisher struct {
func NewPulsarPublisher(
pulsarClient pulsar.Client,
producerOptions pulsar.ProducerOptions,
maxEventsPerMessage int,
pulsarSendTimeout time.Duration,
) (*PulsarPublisher, error) {
partitions, err := pulsarClient.TopicPartitions(producerOptions.Topic)
Expand All @@ -72,7 +69,6 @@ func NewPulsarPublisher(
return &PulsarPublisher{
producer: producer,
pulsarSendTimeout: pulsarSendTimeout,
maxEventsPerMessage: maxEventsPerMessage,
maxMessageBatchSize: maxMessageBatchSize,
numPartitions: len(partitions),
}, nil
Expand All @@ -82,7 +78,6 @@ func NewPulsarPublisher(
// single event sequences up to maxMessageBatchSize.
func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error {
sequences := eventutil.CompactEventSequences(events)
sequences = eventutil.LimitSequencesEventMessageCount(sequences, p.maxEventsPerMessage)
sequences, err := eventutil.LimitSequencesByteSize(sequences, p.maxMessageBatchSize, true)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Run(config schedulerconfig.Configuration) error {
CompressionLevel: config.Pulsar.CompressionLevel,
BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize,
Topic: config.Pulsar.JobsetEventsTopic,
}, config.Pulsar.MaxAllowedEventsPerMessage, config.PulsarSendTimeout)
}, config.PulsarSendTimeout)
if err != nil {
return errors.WithMessage(err, "error creating pulsar publisher")
}
Expand Down Expand Up @@ -182,7 +182,6 @@ func Run(config schedulerconfig.Configuration) error {
config.Scheduling.NodeIdLabel,
config.Scheduling.PriorityClassNameOverride,
config.Scheduling.PriorityClasses,
config.Pulsar.MaxAllowedEventsPerMessage,
config.Pulsar.MaxAllowedMessageSize,
)
if err != nil {
Expand Down

0 comments on commit bc755c3

Please sign in to comment.