-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathotel.go
121 lines (98 loc) · 2.91 KB
/
otel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package otel
import (
"context"
"fmt"
"github.com/alexfalkowski/go-service/meta"
"github.com/alexfalkowski/go-service/otel"
"github.com/alexfalkowski/go-service/transport/nsq/handler"
"github.com/alexfalkowski/go-service/transport/nsq/message"
"github.com/alexfalkowski/go-service/transport/nsq/producer"
"github.com/alexfalkowski/go-service/version"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
)
// TracerParams for otr.
type TracerParams struct {
fx.In
Lifecycle fx.Lifecycle
Config *otel.Config
Version version.Version
}
// NewTracer for otr.
func NewTracer(params TracerParams) (Tracer, error) {
return otel.NewTracer(otel.TracerParams{Lifecycle: params.Lifecycle, Name: "nsq", Config: params.Config, Version: params.Version})
}
// Tracer for otr.
type Tracer trace.Tracer
// NewHandler for otr.
func NewHandler(topic, channel string, tracer Tracer, h handler.Handler) *Handler {
return &Handler{topic: topic, channel: channel, tracer: tracer, Handler: h}
}
// Handler for otr.
type Handler struct {
topic, channel string
tracer Tracer
handler.Handler
}
func (h *Handler) Handle(ctx context.Context, message *message.Message) error {
ctx = extract(ctx, message.Headers)
operationName := fmt.Sprintf("consume %s:%s", h.topic, h.channel)
attrs := []attribute.KeyValue{
semconv.MessagingSystem("nsq"),
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(h.topic),
}
ctx, span := h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
operationName,
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(attrs...),
)
defer span.End()
err := h.Handler.Handle(ctx, message)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
for k, v := range meta.Attributes(ctx) {
span.SetAttributes(attribute.Key(k).String(v))
}
return err
}
// NewProducer for otr.
func NewProducer(tracer Tracer, p producer.Producer) *Producer {
return &Producer{tracer: tracer, Producer: p}
}
// Producer for otr.
type Producer struct {
tracer Tracer
producer.Producer
}
func (p *Producer) Publish(ctx context.Context, topic string, message *message.Message) error {
operationName := fmt.Sprintf("publish %s", topic)
attrs := []attribute.KeyValue{
semconv.MessagingSystem("nsq"),
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(topic),
}
ctx, span := p.tracer.Start(
ctx,
operationName,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(attrs...),
)
defer span.End()
inject(ctx, message.Headers)
err := p.Producer.Publish(ctx, topic, message)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
for k, v := range meta.Attributes(ctx) {
span.SetAttributes(attribute.Key(k).String(v))
}
return err
}