Skip to content

Commit

Permalink
chore: better logging in dispatcher (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Jul 4, 2024
1 parent 36b6640 commit b96cee1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
5 changes: 5 additions & 0 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,11 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
return fmt.Errorf("could not get worker: %w", err)
}

if len(workers) == 0 {
d.l.Warn().Msgf("worker %s not found, ignoring task for step run %s", payload.WorkerId, payload.StepRunId)
return nil
}

// load the step run from the database
stepRun, err := d.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)

Expand Down
12 changes: 9 additions & 3 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,13 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
_, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, true, sessionEstablished)

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
lastSessionEstablished := "NULL"

if worker.LastListenerEstablished.Valid {
lastSessionEstablished = worker.LastListenerEstablished.Time.String()
}

s.l.Error().Err(err).Msgf("could not update worker %s active status to true (session established %s, last session established %s)", request.WorkerId, sessionEstablished.String(), lastSessionEstablished)
return err
}

Expand All @@ -323,7 +329,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
_, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, false, sessionEstablished)

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
s.l.Error().Err(err).Msgf("could not update worker %s active status to false due to worker stream closing (session established %s)", request.WorkerId, sessionEstablished.String())
return err
}

Expand All @@ -337,7 +343,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
_, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, false, sessionEstablished)

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
s.l.Error().Err(err).Msgf("could not update worker %s active status due to worker disconnecting (session established %s)", request.WorkerId, sessionEstablished.String())
return err
}

Expand Down

0 comments on commit b96cee1

Please sign in to comment.