-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtracer.go
119 lines (96 loc) · 2.75 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package tracer
import (
"context"
"fmt"
"github.com/alexfalkowski/go-service/env"
"github.com/alexfalkowski/go-service/meta"
"github.com/alexfalkowski/go-service/nsq"
"github.com/alexfalkowski/go-service/telemetry/tracer"
"github.com/alexfalkowski/go-service/version"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
)
// Params for tracer.
type Params struct {
fx.In
Lifecycle fx.Lifecycle
Config *tracer.Config
Version version.Version
Environment env.Environment
}
// NewTracer for tracer.
func NewTracer(params Params) (Tracer, error) {
return tracer.NewTracer(params.Lifecycle, "nsq", params.Environment, params.Version, params.Config)
}
// Tracer for tracer.
type Tracer trace.Tracer
// NewConsumer for tracer.
func NewConsumer(topic, channel string, tracer Tracer, h nsq.Consumer) *Consumer {
return &Consumer{topic: topic, channel: channel, tracer: tracer, Consumer: h}
}
// Consumer for tracer.
type Consumer struct {
topic, channel string
tracer Tracer
nsq.Consumer
}
func (h *Consumer) Consume(ctx context.Context, message *nsq.Message) error {
ctx = extract(ctx, message.Headers)
operationName := fmt.Sprintf("consume %s:%s", h.topic, h.channel)
attrs := []attribute.KeyValue{
semconv.MessagingSystem("nsq"),
semconv.MessagingDestinationName(h.topic),
}
ctx, span := h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
operationName,
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(attrs...),
)
defer span.End()
err := h.Consumer.Consume(ctx, message)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
for k, v := range meta.Attributes(ctx) {
span.SetAttributes(attribute.Key(k).String(v))
}
return err
}
// NewProducer for tracer.
func NewProducer(tracer Tracer, p nsq.Producer) *Producer {
return &Producer{tracer: tracer, Producer: p}
}
// Producer for tracer.
type Producer struct {
tracer Tracer
nsq.Producer
}
func (p *Producer) Produce(ctx context.Context, topic string, message *nsq.Message) error {
operationName := fmt.Sprintf("publish %s", topic)
attrs := []attribute.KeyValue{
semconv.MessagingSystem("nsq"),
semconv.MessagingDestinationName(topic),
}
ctx, span := p.tracer.Start(
ctx,
operationName,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(attrs...),
)
defer span.End()
inject(ctx, message.Headers)
err := p.Producer.Produce(ctx, topic, message)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
for k, v := range meta.Attributes(ctx) {
span.SetAttributes(attribute.Key(k).String(v))
}
return err
}