Skip to content

Commit

Permalink
add helper functions for creating publish spans
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Aug 23, 2023
1 parent f3938f2 commit ca01559
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 25 deletions.
42 changes: 18 additions & 24 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,7 @@ var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false,
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
opts := getPublishSpanAttributes(t.String(), msg)
ctx, span := tracer().Start(ctx, fmt.Sprintf("%s %s", t.String(), publisherSpanName), opts...)
ctx, publishSpan := startPublishSpan(ctx, msg, t.String())
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
Expand All @@ -582,7 +581,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
spanRecordError(span, errTopicOrderingNotEnabled)
spanRecordError(publishSpan, errTopicOrderingNotEnabled)
return r
}

Expand All @@ -593,49 +592,44 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
span.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize))
publishSpan.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize))

t.initBundler()
t.mu.RLock()
defer t.mu.RUnlock()
// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
if t.stopped {
ipubsub.SetPublishResult(r, "", ErrTopicStopped)
spanRecordError(span, ErrTopicStopped)
spanRecordError(publishSpan, ErrTopicStopped)
return r
}

ctx2, fcSpan := tracer().Start(ctx, publishFlowControlSpanName)
ctx2, fcSpan := startPublishFlowControlSpan(ctx)
if err := t.flowController.acquire(ctx2, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(span, err)
spanRecordError(fcSpan, err)
return r
}
fcSpan.End()

_, batchSpan := tracer().Start(ctx2, publishSchedulerSpanName)
_, batcherSpan := startBatcherSpan(ctx)

bmsg := &bundledMessage{
msg: msg,
res: r,
size: msgSize,
span: span,
batchSpan: batchSpan,
msg: msg,
res: r,
size: msgSize,
span: publishSpan,
batcherSpan: batcherSpan,
}

if span.SpanContext().IsValid() {
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
// Inject the context from the first publish span rather than from flow control / batching.
otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg))
}
// Inject the context from the first publish span rather than from flow control / batching.
injectPropagation(ctx, msg)

if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(span, err)
spanRecordError(publishSpan, err)
}

return r
Expand Down Expand Up @@ -669,8 +663,8 @@ type bundledMessage struct {
size int
// span is the entire publish span (from user calling Publish to the publish RPC resolving).
span trace.Span
// batchSpan traces the message batching operation in publish scheduler.
batchSpan trace.Span
// batcherSpan traces the message batching operation in publish scheduler.
batcherSpan trace.Span
}

func (t *Topic) initBundler() {
Expand Down Expand Up @@ -707,7 +701,7 @@ func (t *Topic) initBundler() {
}
bmsgs := bundle.([]*bundledMessage)
for _, m := range bmsgs {
m.batchSpan.End()
m.batcherSpan.End()
}
t.publishMessageBundle(ctx2, bmsgs)
})
Expand Down
30 changes: 29 additions & 1 deletion pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pubsub

import (
"context"
"fmt"
"log"
"sync"

Expand Down Expand Up @@ -309,14 +310,27 @@ const (
// span names
publisherSpanName = "send"
publishFlowControlSpanName = "publisher flow control"
publishSchedulerSpanName = "publish scheduler"
publishBatcherSpanName = "publish batcher"
publishRPCSpanName = "publish"

// custom pubsub specific attributes
numBatchedMessagesAttribute = "messaging.pubsub.num_messages_in_batch"
orderingAttribute = "messaging.pubsub.ordering_key"
)

func startPublishSpan(ctx context.Context, m *Message, topicName string) (context.Context, trace.Span) {
opts := getPublishSpanAttributes(topicName, m)
return tracer().Start(ctx, fmt.Sprintf("%s %s", topicName, publisherSpanName), opts...)
}

func startPublishFlowControlSpan(ctx context.Context) (context.Context, trace.Span) {
return tracer().Start(ctx, publishFlowControlSpanName)
}

func startBatcherSpan(ctx context.Context) (context.Context, trace.Span) {
return tracer().Start(ctx, publishBatcherSpanName)
}

func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption {
// TODO(hongalex): benchmark this to make sure no significant performance degradation
// when calculating proto.Size in receive paths.
Expand All @@ -341,6 +355,20 @@ func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyV
return ss
}

// injectPropagation injects context data into the Pub/Sub message's Attributes field.
func injectPropagation(ctx context.Context, msg *Message) {
// only inject propagation if a valid span context was detected.
if trace.SpanFromContext(ctx).SpanContext().IsValid() {
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg))
}
}

// spanRecordError records the error, sets the status to error, and ends the span.
// This is recommended by https://opentelemetry.io/docs/instrumentation/go/manual/#record-errors
// since RecordError doesn't set the status of a span.
func spanRecordError(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(otelcodes.Error, err.Error())
Expand Down

0 comments on commit ca01559

Please sign in to comment.