Skip to content

Commit

Permalink
fix: trace integration (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Nov 18, 2022
1 parent 2240b05 commit 7f255f8
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 37 deletions.
9 changes: 0 additions & 9 deletions pkg/client/redis/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ func accessInterceptor(compName string, addr string, config *Config, logger *xlo
xlog.Any("req", cmd.Args()),
xlog.FieldCost(cost))

// 记录链路id
if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 {
fields = append(fields, xlog.String("trace_id", traceId))
}
// error
if err != nil {
fields = append(fields, xlog.FieldErr(err))
Expand Down Expand Up @@ -226,11 +222,6 @@ func accessInterceptor(compName string, addr string, config *Config, logger *xlo
xlog.Any("req", cmd.Args()),
xlog.FieldCost(cost))

// 记录链路id
if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 {
fields = append(fields, xlog.String("trace_id", traceId))
}

// error
if err != nil {
fields = append(fields, xlog.FieldErr(err))
Expand Down
4 changes: 1 addition & 3 deletions pkg/client/redis/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"
"time"

"github.com/douyu/jupiter/pkg/xlog"

"github.com/douyu/jupiter/pkg/core/xtrace"
"github.com/douyu/jupiter/pkg/core/xtrace/jaeger"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -54,7 +52,7 @@ func Test_Interceptor(t *testing.T) {
})

t.Run("access", func(t *testing.T) {
ctx := xlog.SetTraceID(context.Background(), "123456")
ctx := context.TODO()
config.EnableAccessLogInterceptor = true
client, _ := config.Build()

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive
semconv.MessagingRocketmqMessageTagKey.String(msg.GetTags()),
)

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

defer span.End()
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/rocketmq/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
tracer := xtrace.NewTracer(trace.SpanKindProducer)
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("rocketmq"),
semconv.MessagingRocketmqClientGroupKey.String(producer.Group),
semconv.MessagingRocketmqClientIDKey.String(producer.InstanceName),
}

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ func DefaultConfig() *Config {
return &Config{
Addresses: make([]string, 0),
Producer: &ProducerConfig{
Retry: 3,
Retry: 3,
EnableTrace: true,
},
Consumer: &ConsumerConfig{
Reconsume: 3,
WaitMaxDuration: 60 * time.Second,
MessageModel: "Clustering",
EnableTrace: true,
},
}
}
Expand All @@ -106,6 +108,7 @@ func DefaultConsumerConfig() *ConsumerConfig {
Reconsume: 3,
WaitMaxDuration: 60 * time.Second,
MessageModel: "Clustering",
EnableTrace: true,
}
}

Expand All @@ -115,6 +118,7 @@ func DefaultProducerConfig() *ProducerConfig {
Retry: 3,
DialTimeout: time.Second * 3,
RwTimeout: 0,
EnableTrace: true,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/client/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (pc *Producer) Close() error {
}

// Send rocketmq发送消息
// Deprecated: use SendWithContext instead
func (pc *Producer) Send(msg []byte) error {
m := primitive.NewMessage(pc.Topic, msg)
_, err := pc.SendSync(context.Background(), m)
Expand All @@ -151,6 +152,7 @@ func (pc *Producer) SendWithContext(ctx context.Context, msg []byte) error {
}

// SendWithTag rocket mq 发送消息,可以自定义选择 tag
// Deprecated: use SendWithMsg instead
func (pc *Producer) SendWithTag(msg []byte, tag string) error {
m := primitive.NewMessage(pc.Topic, msg)
if tag != "" {
Expand All @@ -166,6 +168,7 @@ func (pc *Producer) SendWithTag(msg []byte, tag string) error {
}

// SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果
// Deprecated: use SendWithMsg instead
func (pc *Producer) SendWithResult(msg []byte, tag string) (*primitive.SendResult, error) {
m := primitive.NewMessage(pc.Topic, msg)
if tag != "" {
Expand All @@ -181,6 +184,7 @@ func (pc *Producer) SendWithResult(msg []byte, tag string) (*primitive.SendResul
}

// SendMsg... 自定义消息格式
// Deprecated: use SendWithMsg instead.
func (pc *Producer) SendMsg(msg *primitive.Message) (*primitive.SendResult, error) {
msg.Topic = pc.Topic
res, err := pc.SendSync(context.Background(), msg)
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/xecho/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func traceServerInterceptor() echo.MiddlewareFunc {
return func(c echo.Context) (err error) {
ctx, span := tracer.Start(c.Request().Context(), c.Request().URL.Path, propagation.HeaderCarrier(c.Request().Header), trace.WithAttributes(attrs...))
span.SetAttributes(semconv.HTTPServerAttributesFromHTTPRequest(pkg.Name(), c.Request().URL.Path, c.Request())...)

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

c.SetRequest(c.Request().WithContext(ctx))
defer span.End()
return next(c)
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/xgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func NewTraceUnaryServerInterceptor() grpc.UnaryServerInterceptor {
}
span.End()
}()

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

return handler(ctx, req)
}
}
Expand Down Expand Up @@ -131,6 +134,9 @@ func NewTraceStreamServerInterceptor() grpc.StreamServerInterceptor {

ctx, span := tracer.Start(ss.Context(), operation, xtrace.MetadataReaderWriter(md), trace.WithAttributes(attrs...))
defer span.End()

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

return handler(srv, contextedServerStream{
ServerStream: ss,
ctx: ctx,
Expand Down
24 changes: 2 additions & 22 deletions pkg/xlog/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@ const (
)

type (
loggerKey struct{}
traceIDKey struct{}
loggerKey struct{}
)

func NewContext(ctx context.Context, l *Logger) context.Context {
traceID := GetTraceID(ctx)
if traceID == "" {
return context.WithValue(ctx, loggerKey{}, l)
}
func NewContext(ctx context.Context, l *Logger, traceID string) context.Context {
return context.WithValue(ctx, loggerKey{}, l.With(String(traceIDField, traceID)))
}

Expand All @@ -46,18 +41,3 @@ func FromContext(ctx context.Context) *Logger {
}
return l
}

func SetTraceID(ctx context.Context, traceID string) context.Context {
if traceID == "" {
return ctx
}
return context.WithValue(ctx, traceIDKey{}, traceID)
}

func GetTraceID(ctx context.Context) string {
traceID, ok := ctx.Value(traceIDKey{}).(string)
if !ok {
return ""
}
return traceID
}
4 changes: 2 additions & 2 deletions pkg/xlog/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func Test_log(t *testing.T) {

func Test_trace(t *testing.T) {

log := xlog.Jupiter().With(xlog.String("traceid", "a:b:c:1"))
ctx := xlog.NewContext(context.TODO(), log)
log := xlog.Jupiter()
ctx := xlog.NewContext(context.TODO(), log, "a:b:c:1")

stdlog := xlog.FromContext(ctx)
stdlog.Debug("debug", xlog.Any("a", "b"))
Expand Down

0 comments on commit 7f255f8

Please sign in to comment.