Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trace integration #569

Merged
merged 3 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/client/redis/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ func accessInterceptor(compName string, addr string, config *Config, logger *xlo
xlog.Any("req", cmd.Args()),
xlog.FieldCost(cost))

// 记录链路id
if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 {
fields = append(fields, xlog.String("trace_id", traceId))
}
// error
if err != nil {
fields = append(fields, xlog.FieldErr(err))
Expand Down Expand Up @@ -226,11 +222,6 @@ func accessInterceptor(compName string, addr string, config *Config, logger *xlo
xlog.Any("req", cmd.Args()),
xlog.FieldCost(cost))

// 记录链路id
if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 {
fields = append(fields, xlog.String("trace_id", traceId))
}

// error
if err != nil {
fields = append(fields, xlog.FieldErr(err))
Expand Down
4 changes: 1 addition & 3 deletions pkg/client/redis/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"
"time"

"github.com/douyu/jupiter/pkg/xlog"

"github.com/douyu/jupiter/pkg/core/xtrace"
"github.com/douyu/jupiter/pkg/core/xtrace/jaeger"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -54,7 +52,7 @@ func Test_Interceptor(t *testing.T) {
})

t.Run("access", func(t *testing.T) {
ctx := xlog.SetTraceID(context.Background(), "123456")
ctx := context.TODO()
config.EnableAccessLogInterceptor = true
client, _ := config.Build()

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive
semconv.MessagingRocketmqMessageTagKey.String(msg.GetTags()),
)

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

defer span.End()
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/rocketmq/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
tracer := xtrace.NewTracer(trace.SpanKindProducer)
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("rocketmq"),
semconv.MessagingRocketmqClientGroupKey.String(producer.Group),
semconv.MessagingRocketmqClientIDKey.String(producer.InstanceName),
}

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ func DefaultConfig() *Config {
return &Config{
Addresses: make([]string, 0),
Producer: &ProducerConfig{
Retry: 3,
Retry: 3,
EnableTrace: true,
},
Consumer: &ConsumerConfig{
Reconsume: 3,
WaitMaxDuration: 60 * time.Second,
MessageModel: "Clustering",
EnableTrace: true,
},
}
}
Expand All @@ -106,6 +108,7 @@ func DefaultConsumerConfig() *ConsumerConfig {
Reconsume: 3,
WaitMaxDuration: 60 * time.Second,
MessageModel: "Clustering",
EnableTrace: true,
}
}

Expand All @@ -115,6 +118,7 @@ func DefaultProducerConfig() *ProducerConfig {
Retry: 3,
DialTimeout: time.Second * 3,
RwTimeout: 0,
EnableTrace: true,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/client/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (pc *Producer) Close() error {
}

// Send rocketmq发送消息
// Deprecated: use SendWithContext instead
func (pc *Producer) Send(msg []byte) error {
m := primitive.NewMessage(pc.Topic, msg)
_, err := pc.SendSync(context.Background(), m)
Expand All @@ -151,6 +152,7 @@ func (pc *Producer) SendWithContext(ctx context.Context, msg []byte) error {
}

// SendWithTag rocket mq 发送消息,可以自定义选择 tag
// Deprecated: use SendWithMsg instead
func (pc *Producer) SendWithTag(msg []byte, tag string) error {
m := primitive.NewMessage(pc.Topic, msg)
if tag != "" {
Expand All @@ -166,6 +168,7 @@ func (pc *Producer) SendWithTag(msg []byte, tag string) error {
}

// SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果
// Deprecated: use SendWithMsg instead
func (pc *Producer) SendWithResult(msg []byte, tag string) (*primitive.SendResult, error) {
m := primitive.NewMessage(pc.Topic, msg)
if tag != "" {
Expand All @@ -181,6 +184,7 @@ func (pc *Producer) SendWithResult(msg []byte, tag string) (*primitive.SendResul
}

// SendMsg... 自定义消息格式
// Deprecated: use SendWithMsg instead.
func (pc *Producer) SendMsg(msg *primitive.Message) (*primitive.SendResult, error) {
msg.Topic = pc.Topic
res, err := pc.SendSync(context.Background(), msg)
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/xecho/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func traceServerInterceptor() echo.MiddlewareFunc {
return func(c echo.Context) (err error) {
ctx, span := tracer.Start(c.Request().Context(), c.Request().URL.Path, propagation.HeaderCarrier(c.Request().Header), trace.WithAttributes(attrs...))
span.SetAttributes(semconv.HTTPServerAttributesFromHTTPRequest(pkg.Name(), c.Request().URL.Path, c.Request())...)

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

c.SetRequest(c.Request().WithContext(ctx))
defer span.End()
return next(c)
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/xgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func NewTraceUnaryServerInterceptor() grpc.UnaryServerInterceptor {
}
span.End()
}()

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

return handler(ctx, req)
}
}
Expand Down Expand Up @@ -131,6 +134,9 @@ func NewTraceStreamServerInterceptor() grpc.StreamServerInterceptor {

ctx, span := tracer.Start(ss.Context(), operation, xtrace.MetadataReaderWriter(md), trace.WithAttributes(attrs...))
defer span.End()

ctx = xlog.NewContext(ctx, xlog.Default(), span.SpanContext().TraceID().String())

return handler(srv, contextedServerStream{
ServerStream: ss,
ctx: ctx,
Expand Down
24 changes: 2 additions & 22 deletions pkg/xlog/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@ const (
)

type (
loggerKey struct{}
traceIDKey struct{}
loggerKey struct{}
)

func NewContext(ctx context.Context, l *Logger) context.Context {
traceID := GetTraceID(ctx)
if traceID == "" {
return context.WithValue(ctx, loggerKey{}, l)
}
func NewContext(ctx context.Context, l *Logger, traceID string) context.Context {
return context.WithValue(ctx, loggerKey{}, l.With(String(traceIDField, traceID)))
}

Expand All @@ -46,18 +41,3 @@ func FromContext(ctx context.Context) *Logger {
}
return l
}

func SetTraceID(ctx context.Context, traceID string) context.Context {
if traceID == "" {
return ctx
}
return context.WithValue(ctx, traceIDKey{}, traceID)
}

func GetTraceID(ctx context.Context) string {
traceID, ok := ctx.Value(traceIDKey{}).(string)
if !ok {
return ""
}
return traceID
}
4 changes: 2 additions & 2 deletions pkg/xlog/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func Test_log(t *testing.T) {

func Test_trace(t *testing.T) {

log := xlog.Jupiter().With(xlog.String("traceid", "a:b:c:1"))
ctx := xlog.NewContext(context.TODO(), log)
log := xlog.Jupiter()
ctx := xlog.NewContext(context.TODO(), log, "a:b:c:1")

stdlog := xlog.FromContext(ctx)
stdlog.Debug("debug", xlog.Any("a", "b"))
Expand Down