-
Notifications
You must be signed in to change notification settings - Fork 3
/
producer.go
97 lines (87 loc) · 2.5 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package rocketmq
import (
"context"
"fmt"
v2 "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/go-bamboo/pkg/log"
otelext "github.com/go-bamboo/pkg/otel"
"github.com/go-kratos/kratos/v2/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// A Pusher interface wraps the method Push.
type Pusher interface {
Name() string
Push(ctx context.Context, topic string, key, value []byte) error
Close() error
}
// rocketProducer 生产者
type rocketProducer struct {
producer v2.Producer
tracer trace.Tracer
propagator propagation.TextMapPropagator
pubCounter metrics.Counter // 发送次数
// topic string
}
func MustNewPusher(c *Conf) Pusher {
pub, err := NewPusher(c)
if err != nil {
log.Fatal(err)
}
return pub
}
func NewPusher(config *Conf) (Pusher, error) {
pd, err := v2.NewProducer(
producer.WithGroupName(config.GroupId),
producer.WithNameServer(config.Addrs),
producer.WithRetry(3),
producer.WithCredentials(primitive.Credentials{
AccessKey: config.AccessKey,
SecretKey: config.SecretKey,
}),
producer.WithNamespace(config.Namespace),
)
if err != nil {
return nil, fmt.Errorf("create new pd err:%s", err)
}
if err := pd.Start(); err != nil {
return nil, err
}
tracingPub := &rocketProducer{
producer: pd,
tracer: otel.Tracer("roketmq"),
propagator: propagation.NewCompositeTextMapPropagator(otelext.Metadata{}, propagation.Baggage{}, otelext.TraceContext{}),
// topic: c.Topic,
}
return tracingPub, nil
}
func (p *rocketProducer) Name() string {
return "rocketmq"
}
func (p *rocketProducer) Push(ctx context.Context, topic string, key, value []byte) error {
msg := primitive.NewMessage(topic, value)
msg.WithTag("")
msg.WithKeys([]string{string(key)})
operation := "pub:" + topic
ctx, span := p.tracer.Start(ctx, operation, trace.WithSpanKind(trace.SpanKindProducer))
p.propagator.Inject(ctx, &MessageTextMapCarrier{msg: msg})
span.SetAttributes(
attribute.String("kafka.topic", topic),
attribute.String("kafka.key", string(key)),
)
sendResult, err := p.producer.SendSync(ctx, msg)
if err != nil {
log.Error(err)
return err
}
log.Debugf("Delivered message to topic %s [%v] at offset %v", topic, sendResult.RegionID, sendResult.OffsetMsgID)
return nil
}
func (p *rocketProducer) Close() error {
p.producer.Shutdown()
return nil
}