Skip to content

Commit

Permalink
feat: add log fields
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed Mar 8, 2024
1 parent 4850fce commit 19b1b58
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions integration/watermill/keel/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e
}

msg := message.NewMessage(s.uuidFunc(), payload)
l = l.With(zap.String("message_id", msg.UUID))
if labeler, ok := keellog.LabelerFromRequest(r); ok {
labeler.Add(zap.String("message_id", msg.UUID))
}
Expand All @@ -174,6 +175,10 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e
msg.Metadata.Set(name, strings.Join(headers, ","))
}

for k, v := range msg.Metadata {
l = l.With(zap.String(k, v))
}

// TODO different context?
ctx, cancelCtx := context.WithCancel(r.Context())
msg.SetContext(ctx)
Expand All @@ -185,13 +190,13 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e
// wait for ACK
select {
case <-msg.Acked():
l.Info("message acked")
l.Debug("message acked")
return nil
case <-msg.Nacked():
l.Info("message nacked")
l.Debug("message nacked")
return ErrMessageNacked
case <-r.Context().Done():
l.Info("message cancled")
l.Debug("message cancled")
return ErrContextCanceled
}
}
Expand Down

0 comments on commit 19b1b58

Please sign in to comment.