Skip to content

Commit

Permalink
Improve logging in ingester pipeline
Browse files Browse the repository at this point in the history
This should help us understand what is happening in our ingestion pipelines

 - Should log if we are no longer receiving pulsar messages for 2mins
 - Will log a summary of how many messages and event in each "batch"
 - Will log a summary of the types of events in each batch
 - Will log a summary of how long Convert took for each batch

This is admittedly quite a "quick" fix and better long term steps would be:
 - Metrics or spans
  - Some of these could be at the ingseter pipeline level (generic)
  - Some would need to be done in each ingester to expose more detailed information such as which query is all the time being spent on

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin committed Jun 19, 2024
1 parent bc755c3 commit d410890
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 3 deletions.
58 changes: 55 additions & 3 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,38 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
i.consumer = consumer
defer closePulsar()
}
pulsarMsgs := i.consumer.Chan()
pulsarMessageChannel := i.consumer.Chan()
pulsarMessages := make(chan pulsar.ConsumerMessage)

// Consume pulsar messages
// Used to track if we are no longer receiving pulsar messages
go func() {
timeout := time.Minute * 2
timer := time.NewTimer(timeout)
loop:
for {
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
select {
case msg, ok := <-pulsarMessageChannel:
if !ok {
// Channel closed
break loop
}
pulsarMessages <- msg
case <-timer.C:
log.Infof("No pulsar message received in %s", timeout)
}
}
close(pulsarMessages)
}()

// Convert to event sequences
eventSequences := make(chan *EventSequencesWithIds)
go func() {
for msg := range pulsarMsgs {
for msg := range pulsarMessages {
converted := unmarshalEventSequences(msg, i.metrics)
eventSequences <- converted
}
Expand All @@ -131,11 +157,24 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
close(batchedEventSequences)
}()

// Log summary of batch
preprocessedBatchEventSequences := make(chan *EventSequencesWithIds)
go func() {
for msg := range batchedEventSequences {
logSummaryOfEventSequences(msg)
preprocessedBatchEventSequences <- msg
}
close(preprocessedBatchEventSequences)
}()

// Convert to instructions
instructions := make(chan T)
go func() {
for msg := range batchedEventSequences {
for msg := range preprocessedBatchEventSequences {
start := time.Now()
converted := i.converter.Convert(ctx, msg)
taken := time.Now().Sub(start)
log.Infof("Processed %d pulsar messages in %dms", len(msg.MessageIds), taken.Milliseconds())
instructions <- converted
}
close(instructions)
Expand Down Expand Up @@ -244,3 +283,16 @@ func combineEventSequences(sequences []*EventSequencesWithIds) *EventSequencesWi
EventSequences: combinedSequences, MessageIds: messageIds,
}
}

func logSummaryOfEventSequences(sequence *EventSequencesWithIds) {
numberOfEvents := 0
countOfEventsByType := map[string]int{}
for _, eventSequence := range sequence.EventSequences {
numberOfEvents += len(eventSequence.Events)
for _, e := range eventSequence.Events {
typeString := e.GetEventName()
countOfEventsByType[typeString] = countOfEventsByType[typeString] + 1
}
}
log.Infof("Batch being processed contains %d event messages and %d events of type %v", len(sequence.MessageIds), numberOfEvents, countOfEventsByType)
}
1 change: 1 addition & 0 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (c *InstructionConverter) Convert(_ *armadacontext.Context, sequencesWithId
operations = AppendDbOperation(operations, op)
}
}
log.Infof("Converted sequences into %d db operations", len(operations))
return &DbOperationsWithMessageIds{
Ops: operations,
MessageIds: sequencesWithIds.MessageIds,
Expand Down
52 changes: 52 additions & 0 deletions pkg/armadaevents/events_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,58 @@ func (ev *EventSequence_Event) UnmarshalJSON(data []byte) error {
return nil
}

func (ev *EventSequence_Event) GetEventName() string {
switch ev.GetEvent().(type) {
case *EventSequence_Event_SubmitJob:
return "SubmitJob"
case *EventSequence_Event_JobRunLeased:
return "JobRunLeased"
case *EventSequence_Event_JobRunRunning:
return "JobRunRunning"
case *EventSequence_Event_JobRunSucceeded:
return "JobRunSucceeded"
case *EventSequence_Event_JobRunErrors:
return "JobRunErrors"
case *EventSequence_Event_JobSucceeded:
return "JobSucceeded"
case *EventSequence_Event_JobErrors:
return "JobErrors"
case *EventSequence_Event_JobPreemptionRequested:
return "JobPreemptionRequested"
case *EventSequence_Event_JobRunPreemptionRequested:
return "JobRunPreemptionRequested"
case *EventSequence_Event_ReprioritiseJob:
return "ReprioritiseJob"
case *EventSequence_Event_ReprioritiseJobSet:
return "ReprioritiseJobSet"
case *EventSequence_Event_CancelJob:
return "CancelJob"
case *EventSequence_Event_CancelJobSet:
return "CancelJobSet"
case *EventSequence_Event_CancelledJob:
return "CancelledJob"
case *EventSequence_Event_JobRunCancelled:
return "JobRunCancelled"
case *EventSequence_Event_JobRequeued:
return "JobRequeued"
case *EventSequence_Event_PartitionMarker:
return "PartitionMarker"
case *EventSequence_Event_JobRunPreempted:
return "JobRunPreemped"
case *EventSequence_Event_JobRunAssigned:
return "JobRunAssigned"
case *EventSequence_Event_JobValidated:
return "JobValidated"
case *EventSequence_Event_ReprioritisedJob:
return "ReprioritisedJob"
case *EventSequence_Event_ResourceUtilisation:
return "ResourceUtilisation"
case *EventSequence_Event_StandaloneIngressInfo:
return "StandloneIngressIngo"
}
return "Unknown"
}

func (kmo *KubernetesMainObject) UnmarshalJSON(data []byte) error {
if string(data) == "null" || string(data) == `""` {
return nil
Expand Down

0 comments on commit d410890

Please sign in to comment.