Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve tracing with otel #451

Merged
merged 16 commits into from
Sep 30, 2022
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ See the [中文文档](http://jupiter.douyu.com/) for the Chinese documentation.
go install github.com/douyu/jupiter/cmd/jupiter@latest
jupiter new example-go
cd example-go
docker-compose up
jupiter run -c cmd/exampleserver/.jupiter.toml
```

Expand Down
4 changes: 2 additions & 2 deletions pkg/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ func (app *Application) Run(servers ...server.Server) error {
app.servers = append(app.servers, servers...)
app.smu.Unlock()

hooks.Do(hooks.Stage_BeforeRun)

app.waitSignals() //start signal listen task in goroutine
defer app.clean()

hooks.Do(hooks.Stage_BeforeRun)

// todo jobs not graceful
_ = app.startJobs()

Expand Down
28 changes: 17 additions & 11 deletions pkg/client/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ import (
"strings"
"time"

"go.etcd.io/etcd/client/v3/concurrency"

"github.com/douyu/jupiter/pkg/xlog"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
clientv3 "go.etcd.io/etcd/client/v3"

//"go.etcd.io/etcd/mvcc/mvccpb"
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"google.golang.org/grpc"
)

Expand All @@ -42,17 +39,26 @@ type Client struct {

// New ...
func newClient(config *Config) (*Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpcprom.StreamClientInterceptor),
}

if config.EnableTrace {
dialOptions = append(dialOptions,
grpc.WithChainUnaryInterceptor(traceUnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(traceStreamClientInterceptor()),
)
}

conf := clientv3.Config{
Endpoints: config.Endpoints,
DialTimeout: config.ConnectTimeout,
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
AutoSyncInterval: config.AutoSyncInterval,
DialOptions: dialOptions,
AutoSyncInterval: config.AutoSyncInterval,
}

config.logger = config.logger.With(xlog.FieldAddrAny(config.Endpoints))
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type (
// 自动同步member list的间隔
AutoSyncInterval time.Duration `json:"autoAsyncInterval"`
TTL int // 单位:s
logger *xlog.Logger
EnableTrace bool `json:"enableTrace" toml:"enableTrace"`

logger *xlog.Logger
}
)

Expand All @@ -60,6 +62,7 @@ func DefaultConfig() *Config {
BasicAuth: false,
ConnectTimeout: cast.ToDuration("5s"),
Secure: false,
EnableTrace: true,
logger: xlog.Jupiter().With(xlog.FieldMod("client.etcd")),
}
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/client/etcdv3/intercept.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package etcdv3

import (
"context"

"github.com/douyu/jupiter/pkg/xtrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

func traceUnaryClientInterceptor() grpc.UnaryClientInterceptor {
tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemGRPC,
}

return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {

md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}

ctx, span := tracer.Start(ctx, method, xtrace.MetadataReaderWriter(md), trace.WithAttributes(attrs...))
ctx = metadata.NewOutgoingContext(ctx, md)

span.SetAttributes(
semconv.RPCMethodKey.String(method),
)

err = invoker(ctx, method, req, reply, cc, opts...)

span.SetStatus(codes.Ok, "ok")

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

span.End()

return err
}
}

func traceStreamClientInterceptor() grpc.StreamClientInterceptor {
tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemGRPC,
}

return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {

md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}

ctx, span := tracer.Start(ctx, method, xtrace.MetadataReaderWriter(md), trace.WithAttributes(attrs...))
ctx = metadata.NewOutgoingContext(ctx, md)

span.SetAttributes(
semconv.RPCMethodKey.String(method),
)

clientStream, err := streamer(ctx, desc, cc, method, opts...)

if err != nil {
span.SetStatus(codes.Ok, "ok")

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

span.End()
}

return clientStream, nil
}
}
33 changes: 19 additions & 14 deletions pkg/client/grpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/fatih/color"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -96,32 +96,37 @@ func debugUnaryClientInterceptor(addr string) grpc.UnaryClientInterceptor {
}

func traceUnaryClientInterceptor() grpc.UnaryClientInterceptor {
tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemGRPC,
}

return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}
tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
}

ctx, span := tracer.Start(ctx, method, xtrace.MetadataReaderWriter(md), trace.WithAttributes(attrs...))
ctx = metadata.NewOutgoingContext(ctx, md)
span.SetAttributes(
semconv.RPCMethodKey.String(method),
)
defer func() {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.SetStatus(codes.Ok, "ok")
span.End()
}()
return invoker(ctx, method, req, reply, cc, opts...)

err = invoker(ctx, method, req, reply, cc, opts...)

span.SetStatus(codes.Ok, "ok")

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

span.End()

return err
}
}

Expand Down
33 changes: 17 additions & 16 deletions pkg/client/resty/resty.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var errSlowCommand = errors.New("http resty slow command")

// Client ...
type Client = resty.Client

// Config ...
type (
// Config options
Expand Down Expand Up @@ -75,12 +78,12 @@ type (
)

// StdConfig 返回标准配置
func StdConfig(name string) Config {
func StdConfig(name string) *Config {
return RawConfig("jupiter.resty." + name)
}

// RawConfig 返回配置
func RawConfig(key string) Config {
func RawConfig(key string) *Config {
var config = DefaultConfig()
if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldName(key), xlog.FieldExtMessage(config))
Expand All @@ -93,8 +96,8 @@ func RawConfig(key string) Config {
}

// DefaultConfig 返回默认配置
func DefaultConfig() Config {
return Config{
func DefaultConfig() *Config {
return &Config{
Debug: false,
EnableMetric: true,
EnableTrace: true,
Expand Down Expand Up @@ -154,19 +157,16 @@ func (config *Config) Build() (*resty.Client, error) {

})

tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("http"),
}

client.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
if config.EnableTrace {
tracer := xtrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("http"),
}
ctx, span := tracer.Start(r.Context(), r.Method, propagation.HeaderCarrier(r.Header), trace.WithAttributes(attrs...))
span.SetAttributes(
semconv.RPCSystemKey.String("http"),
semconv.PeerServiceKey.String("http_client_request"),
semconv.HTTPMethodKey.String(r.Method),
semconv.HTTPURLKey.String(r.URL),
)

ctx, _ := tracer.Start(r.Context(), r.URL, propagation.HeaderCarrier(r.Header), trace.WithAttributes(attrs...))

r.SetContext(ctx)
}

Expand Down Expand Up @@ -198,6 +198,7 @@ func (config *Config) Build() (*resty.Client, error) {

if config.EnableTrace {
span := trace.SpanFromContext(r.Request.Context())
span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(r.Request.RawRequest)...)
span.SetAttributes(
semconv.HTTPStatusCodeKey.Int64(int64(r.StatusCode())),
)
Expand Down
Loading