diff --git a/README.md b/README.md index ede24d4..bc18288 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,14 @@ traceandtrace-go is go tracing lib. It integrate multi tracer such as jeager,zipkin,skywalking and so on
## Version introduction -- v1.0.0 only support jeager +### v1.0.0 +- only support jeager - support http and gRPC (or both) tracing - support sampler, sampler type and collector env setting +### v1.0.3 +- support jeager and zipkin + ## API [godoc](https://pkg.go.dev/github.com/codeandcode0x/traceandtrace-go) @@ -22,11 +26,12 @@ traceandtrace-go is go tracing lib. It integrate multi tracer such as jeager,zip | ---- | ---- | | TRACE_SAMPLER_TYPE | const/probabilistic/ratelimiting/remote | | TRACE_SAMPLER_PARAM | 0-1 | -| TRACE_ENDPOINT | http://localhost:14268/api/traces | -| TRACE_AGENT_HOST | localhost:6831 | -| TRACE_REPORTER_LOG_SPANS | false/ture | +| TRACE_ENDPOINT | http://localhost:14268/api/traces (jaeger) or http://localhost:9411/api/v2/spans (zipkin) | +| TRACE_AGENT_HOST | localhost:6831 (jaeger) | +| TRACE_REPORTER_LOG_SPANS | false or ture | +| TRACE_TYPE | jaeger or zipkin | -## Ext field +## Jaeger Ext field spanKind
component
samplingPriority
@@ -45,23 +50,27 @@ httpMethod
dbUser
messageBusDestination
-## quick start +## Quick Start -### start jaeger +### Start Jaeger ```shell -docker run \ +docker run -d --name jaeger \ +-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \ -p 5775:5775/udp \ --p 16686:16686 \ -p 6831:6831/udp \ -p 6832:6832/udp \ -p 5778:5778 \ +-p 16686:16686 \ -p 14268:14268 \ +-p 14250:14250 \ +-p 9411:9411 \ ethansmart-docker.pkg.coding.net/istioalltime/roandocker/jaegertracing-all-in-one:1.22.0 + ``` -### import package +### Import Package ```shell go get github.com/codeandcode0x/traceandtrace-go @@ -70,14 +79,14 @@ go get github.com/codeandcode0x/traceandtrace-go ### HTTP tracing Create a trace on the http request method side. -![http to grpc client](wiki/imgs/http_client.jpg) +![http to grpc client](wiki/imgs/http_client_2.jpg) tags are map[string]string type, you can pass logs k-v, tag and field. -### RPC tracing +### gRPC tracing Create a trace on the rpc request method side -- **client** +**client** ```go import ( @@ -94,11 +103,11 @@ if err != nil { } ... ``` -- **server** +**server** ```go import ( - tracing "github.com/codeandcode0x/traceandtrace-go/wrapper/rpc" + tracing "github.com/codeandcode0x/traceandtrace-go" ) //No need to request other rpc services @@ -126,7 +135,8 @@ newRpcServiceReq(tracer) ### Http to gRPC tracing ![http to grpc client](wiki/imgs/httptogrpc_client.jpg) -To call gRPC on the http server side, you need to add the parent context to the rpc client. For details, you can see the [example](example/http/httpServer.go) . +**ptx** is parent context, it can create sub-context trace span
+To call gRPC on the http server side, you need to add the parent context to the gRPC client. For details, you can see the [example](example/http/httpServer.go) . ## Concurrent Processing ### goroutine context control @@ -135,9 +145,9 @@ To call gRPC on the http server side, you need to add the parent context to the - By context WithCancel() create sub-coroutine sessions and manage coroutine tasks ; - every context will carry related data of parent trace and child span ; -![goroutine session](https://images2018.cnblogs.com/blog/1048291/201806/1048291-20180629074859717-1555813847.png) +![goroutine session](wiki/imgs/goroutine.png) -### trace job control +### Trace Job Control start and end trace job ```go diff --git a/example/grpc/grpcClient.go b/example/grpc/grpcClient.go index f8b66e4..5fbc638 100644 --- a/example/grpc/grpcClient.go +++ b/example/grpc/grpcClient.go @@ -19,7 +19,9 @@ func main() { func gRPCExample() { address := "localhost:22530" defaultName := "ethan" - rpcOption, closer := tracing.AddRpcClientTracing("RpcClientExample") + rpcOption, closer := tracing.AddRpcClientTracing( + "RpcClientExample", + map[string]string{"version": "v1"}) defer closer.Close() // Set up a connection to the server. diff --git a/example/grpc/grpcServer.go b/example/grpc/grpcServer.go index 1db72e2..894cebc 100644 --- a/example/grpc/grpcServer.go +++ b/example/grpc/grpcServer.go @@ -32,7 +32,9 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe } func main() { - rpcOption, closer, _ := tracing.AddRpcServerTracing("RpcServer") + rpcOption, closer, _ := tracing.AddRpcServerTracing( + "RpcServer", + map[string]string{"version": "v1"}) defer closer.Close() lis, err := net.Listen("tcp", port) diff --git a/example/http/httpClient.go b/example/http/httpClient.go index 8edf82a..417627e 100644 --- a/example/http/httpClient.go +++ b/example/http/httpClient.go @@ -20,7 +20,12 @@ func httpClient() { httpClient := &http.Client{} r, _ := http.NewRequest("GET", httpTogRPCSrcUrl, nil) // set tracing - _, cancel := tracing.AddHttpTracing("HttpClent", "rpc/tracing GET", r.Header, map[string]string{"version": "v1"}) + _, cancel := tracing.AddHttpTracing( + "HttpClient", + "/rpc/tracing GET", r.Header, + map[string]string{"version": "v1"}) + // or map[string]string{"traceType": "zipkin", "version": "v1"}), traceType : jaeger (default) or zipkin + // or export TRACE_TYPE=zipkin or jaeger defer cancel() // send reqeust response, _ := httpClient.Do(r) diff --git a/example/http/httpServer.go b/example/http/httpServer.go index 89bfe1d..7b23766 100644 --- a/example/http/httpServer.go +++ b/example/http/httpServer.go @@ -9,7 +9,7 @@ import ( "time" tracing "github.com/codeandcode0x/traceandtrace-go" - pb "github.com/codeandcode0x/traceandtrace-go/example/helloworld/proto" + pb "github.com/codeandcode0x/traceandtrace-go/example/protos/helloworld" "google.golang.org/grpc" ) @@ -20,8 +20,10 @@ func main() { // http to gRPC func httpServer() { http.HandleFunc("/rpc/tracing", func(w http.ResponseWriter, r *http.Request) { - log.Println(".............. header ", r.Header) - pctx, cancel := tracing.AddHttpTracing("HttpServer", "/rpc/tracing GET", r.Header, map[string]string{"version": "v1"}) + pctx, cancel := tracing.AddHttpTracing( + "HttpServer", + "/rpc/tracing GET", r.Header, + map[string]string{"version": "v1"}) defer cancel() // rpc tracing result := RpcClient(pctx) @@ -33,7 +35,11 @@ func httpServer() { //grpc request func RpcClient(ptx context.Context) string { - rpcOption, closer := tracing.AddRpcClientTracing("RpcClient") + rpcOption, closer := tracing.AddRpcClientTracing( + "RpcClient", + map[string]string{"version": "v1"}) + // or map[string]string{"traceType": "zipkin", "version": "v1"}), traceType : jaeger (default) or zipkin + // or export TRACE_TYPE=zipkin or jaeger defer closer.Close() address := "localhost:22530" conn, err := grpc.Dial(address, grpc.WithInsecure(), rpcOption) diff --git a/reporter_jobs.go b/reporter_jobs.go index d522470..55c6562 100644 --- a/reporter_jobs.go +++ b/reporter_jobs.go @@ -22,6 +22,8 @@ import ( "io" "log" "net/http" + "os" + "strings" tracing "github.com/codeandcode0x/traceandtrace-go/tracer" opentracing "github.com/opentracing/opentracing-go" @@ -52,18 +54,39 @@ func GenerateTracingJobs(pch chan<- context.Context, parent context.Context, svc // do trace reporter func doTask(ch chan context.Context, parent context.Context, svc, spanName string, header http.Header, tags map[string]string, traceType string) { - //定义 tracer, closer + //define tracer, closer var tracer opentracing.Tracer var closer io.Closer var ctx context.Context + //init tracer + tracer, closer = SelectInitTracer(svc, map[string]string{"traceType": traceType}) + ctx = tracing.AddHttpTracer(svc, spanName, parent, header, tracer, tags) + //close + defer closer.Close() + ch <- ctx +} + +//select init tracer +func SelectInitTracer(svc string, param ...map[string]string) (opentracing.Tracer, io.Closer) { + var tracer opentracing.Tracer + var closer io.Closer + // get tracer type + traceType := JAEGER_TRACER + if tType := os.Getenv("TRACE_TYPE"); tType != "" { + traceType = tType + } else if len(param) > 0 { + if _, exist := param[0]["traceType"]; exist { + traceType = strings.ToLower(param[0]["traceType"]) + } + } + // select reporter type switch traceType { case "jaeger": tracer, closer = tracing.InitJaeger(svc) - ctx = tracing.AddTracer(svc, spanName, parent, header, tracer, tags) break case "zipkin": - log.Println("create zipkin tracing job") + tracer, closer = tracing.InitZipkin(svc) break case "skywalking": log.Println("create skywalking tracing job") @@ -71,7 +94,6 @@ func doTask(ch chan context.Context, parent context.Context, default: break } - - defer closer.Close() - ch <- ctx + //return + return tracer, closer } diff --git a/tracer/grpc_tracer.go b/tracer/grpc_tracer.go new file mode 100644 index 0000000..d586df1 --- /dev/null +++ b/tracer/grpc_tracer.go @@ -0,0 +1,120 @@ +package traceandtracego + +import ( + logger "log" + + opentracing "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +//声明 tracer +var tracer opentracing.Tracer + +//text map reader +type TextMapReader struct { + metadata.MD +} + +//text map writer +type TextMapWriter struct { + metadata.MD +} + +//RPC Client Dial Option +func ClientDialOption(parentTracer opentracing.Tracer) grpc.DialOption { + tracer = parentTracer + return grpc.WithUnaryInterceptor(grpcClientInterceptor) +} + +//RPC Server Dial Option +func ServerDialOption(tracer opentracing.Tracer) grpc.ServerOption { + return grpc.UnaryInterceptor(grpcServerInterceptor) +} + +//RPC Client 拦截器 +func grpcClientInterceptor( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption) (err error) { + + //从context中获取metadata + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(nil) + } else { + //如果对metadata进行修改,那么需要用拷贝的副本进行修改 + md = md.Copy() + } + //carrier := opentracing.TextMapCarrier{} + carrier := TextMapWriter{md} + //父类 context + var currentContext opentracing.SpanContext + //从 context 中获取原始的 span + parentSpan := opentracing.SpanFromContext(ctx) + if parentSpan != nil { + currentContext = parentSpan.Context() + } else { + //否则创建 span + span := tracer.StartSpan(method) + defer span.Finish() + currentContext = span.Context() + } + //将 span 的 context 信息注入到 carrier 中 + e := tracer.Inject(currentContext, opentracing.TextMap, carrier) + if e != nil { + logger.Fatalln("tracer inject failed", e) + } + //创建一个新的 context,把 metadata 附带上 + ctx = metadata.NewOutgoingContext(ctx, md) + return invoker(ctx, method, req, reply, cc, opts...) +} + +//RPC Server 拦截器 +func grpcServerInterceptor( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (resp interface{}, err error) { + //从context中获取metadata。md.(type) == map[string][]string + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(nil) + } else { + //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释) + md = md.Copy() + } + carrier := TextMapReader{md} + tracer := opentracing.GlobalTracer() + spanContext, e := tracer.Extract(opentracing.TextMap, carrier) + if e != nil { + logger.Fatalln("extract span context err", e) + } + + span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext)) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + + return handler(ctx, req) +} + +//text map writer set +func (t TextMapWriter) Set(key, val string) { + t.MD[key] = append(t.MD[key], val) +} + +// 读取 metadata 中的 span 信息 +func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { //不能是指针 + for key, val := range t.MD { + for _, v := range val { + if err := handler(key, v); err != nil { + return err + } + } + } + return nil +} diff --git a/tracer/http_tracer.go b/tracer/http_tracer.go new file mode 100644 index 0000000..3c78e19 --- /dev/null +++ b/tracer/http_tracer.go @@ -0,0 +1,128 @@ +package traceandtracego + +import ( + "log" + logger "log" + "net/http" + "strconv" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + spanLog "github.com/opentracing/opentracing-go/log" + "golang.org/x/net/context" +) + +// add jaeger http tracer +func AddHttpTracer(svcName, spanName string, + ctx context.Context, + header http.Header, + tracer opentracing.Tracer, + tags map[string]string) context.Context { + //初始化 tracer + opentracing.InitGlobalTracer(tracer) + var sp opentracing.Span + //从 header 中获取 span + spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(header)) + if spanCtx != nil { + sp = opentracing.GlobalTracer().StartSpan(spanName, opentracing.ChildOf(spanCtx)) + } else { + //如果 header 中没有携带 context, 则新建 span + sp = tracer.StartSpan(spanName) + } + //写入 tag 或者 日志 + for k, v := range tags { + sp.LogFields( + spanLog.String(k, v), + ) + } + + // setting ext + if _, exist := tags["spanKind"]; exist { + // enum: client,server,producer,consumer + ext.SpanKind.Set(sp, ext.SpanKindEnum(tags["spanKind"])) + } + + if _, exist := tags["component"]; exist { + ext.Component.Set(sp, tags["component"]) + } + + if _, exist := tags["samplingPriority"]; exist { + spUint, err := strconv.Atoi(tags["samplingPriority"]) + if err != nil { + log.Fatalf("sampling priority strconv error %v", err) + } + ext.SamplingPriority.Set(sp, uint16(spUint)) + } + + if _, exist := tags["peerService"]; exist { + ext.PeerService.Set(sp, tags["peerService"]) + } + + if _, exist := tags["peerAddress"]; exist { + ext.PeerAddress.Set(sp, tags["peerAddress"]) + } + + if _, exist := tags["peerHostname"]; exist { + ext.PeerHostname.Set(sp, tags["peerHostname"]) + } + + if _, exist := tags["peerIpv4"]; exist { + pi, _ := strconv.ParseUint(tags["peerIpv4"], 10, 32) + ext.PeerHostIPv4.Set(sp, uint32(pi)) + } + + if _, exist := tags["peerIpv6"]; exist { + ext.PeerHostIPv6.Set(sp, tags["peerIpv6"]) + } + + if _, exist := tags["peerPort"]; exist { + ppInt, _ := strconv.Atoi(tags["peerPort"]) + ext.PeerPort.Set(sp, uint16(ppInt)) + } + + if _, exist := tags["httpUrl"]; exist { + ext.HTTPUrl.Set(sp, tags["httpUrl"]) + } + + if _, exist := tags["httpStatusCode"]; exist { + hscInt, _ := strconv.Atoi(tags["httpStatusCode"]) + ext.HTTPStatusCode.Set(sp, uint16(hscInt)) + } + + if _, exist := tags["dbStatement"]; exist { + ext.DBStatement.Set(sp, tags["dbStatement"]) + } + + if _, exist := tags["dbInstance"]; exist { + ext.DBInstance.Set(sp, tags["dbInstance"]) + } + + if _, exist := tags["dbType"]; exist { + ext.DBType.Set(sp, tags["dbType"]) + } + + if _, exist := tags["httpMethod"]; exist { + ext.HTTPMethod.Set(sp, tags["httpMethod"]) + } + + if _, exist := tags["dbUser"]; exist { + ext.DBUser.Set(sp, tags["dbUser"]) + } + + if _, exist := tags["messageBusDestination"]; exist { + ext.MessageBusDestination.Set(sp, tags["messageBusDestination"]) + } + + //注入span (用于传递) + if err := opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(header)); err != nil { + logger.Fatalln("inject failed", err) + } + //关闭连接 + defer sp.Finish() + //返回带有 span 的 context + return opentracing.ContextWithSpan(ctx, sp) +} diff --git a/tracer/jaeger.go b/tracer/jaeger.go index ecbc3f8..aaf7cb2 100644 --- a/tracer/jaeger.go +++ b/tracer/jaeger.go @@ -2,32 +2,15 @@ package traceandtracego import ( "io" - "log" logger "log" - "net/http" "os" "strconv" opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - spanLog "github.com/opentracing/opentracing-go/log" jaeger "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) -//声明 tracer -var tracer opentracing.Tracer -var ctxShare context.Context -var rpcCtx string -var sf = 100 - -type TextMapReader struct { - metadata.MD -} - //初始化 Jaeger func InitJaeger(service string) (opentracing.Tracer, io.Closer) { // trace default setting @@ -64,229 +47,12 @@ func InitJaeger(service string) (opentracing.Tracer, io.Closer) { } //write sub span -func WriteSubSpan(span opentracing.Span, subSpanName string) { - //use context - ctx := context.Background() - ctx = opentracing.ContextWithSpan(ctx, span) - //其他过程获取并开始子 span - newSpan, _ := opentracing.StartSpanFromContext(ctx, subSpanName) - //StartSpanFromContext 会将新span保存到ctx中更新 - defer newSpan.Finish() -} - -// TracerWrapper tracer wrapper -func AddTracer(svcName, spanName string, - ctx context.Context, - header http.Header, - tracer opentracing.Tracer, - tags map[string]string) context.Context { - //初始化 tracer - opentracing.InitGlobalTracer(tracer) - var sp opentracing.Span - //从 header 中获取 span - spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(header)) - if spanCtx != nil { - sp = opentracing.GlobalTracer().StartSpan(spanName, opentracing.ChildOf(spanCtx)) - } else { - //如果 header 中没有携带 context, 则新建 span - sp = tracer.StartSpan(spanName) - } - //写入 tag 或者 日志 - for k, v := range tags { - sp.LogFields( - spanLog.String(k, v), - ) - } - - // setting ext - if _, exist := tags["spanKind"]; exist { - // enum: client,server,producer,consumer - ext.SpanKind.Set(sp, ext.SpanKindEnum(tags["spanKind"])) - } - - if _, exist := tags["component"]; exist { - ext.Component.Set(sp, tags["component"]) - } - - if _, exist := tags["samplingPriority"]; exist { - spUint, err := strconv.Atoi(tags["samplingPriority"]) - if err != nil { - log.Fatalf("sampling priority strconv error %v", err) - } - ext.SamplingPriority.Set(sp, uint16(spUint)) - } - - if _, exist := tags["peerService"]; exist { - ext.PeerService.Set(sp, tags["peerService"]) - } - - if _, exist := tags["peerAddress"]; exist { - ext.PeerAddress.Set(sp, tags["peerAddress"]) - } - - if _, exist := tags["peerHostname"]; exist { - ext.PeerHostname.Set(sp, tags["peerHostname"]) - } - - if _, exist := tags["peerIpv4"]; exist { - pi, _ := strconv.ParseUint(tags["peerIpv4"], 10, 32) - ext.PeerHostIPv4.Set(sp, uint32(pi)) - } - - if _, exist := tags["peerIpv6"]; exist { - ext.PeerHostIPv6.Set(sp, tags["peerIpv6"]) - } - - if _, exist := tags["peerPort"]; exist { - ppInt, _ := strconv.Atoi(tags["peerPort"]) - ext.PeerPort.Set(sp, uint16(ppInt)) - } - - if _, exist := tags["httpUrl"]; exist { - ext.HTTPUrl.Set(sp, tags["httpUrl"]) - } - - if _, exist := tags["httpStatusCode"]; exist { - hscInt, _ := strconv.Atoi(tags["httpStatusCode"]) - ext.HTTPStatusCode.Set(sp, uint16(hscInt)) - } - - if _, exist := tags["dbStatement"]; exist { - ext.DBStatement.Set(sp, tags["dbStatement"]) - } - - if _, exist := tags["dbInstance"]; exist { - ext.DBInstance.Set(sp, tags["dbInstance"]) - } - - if _, exist := tags["dbType"]; exist { - ext.DBType.Set(sp, tags["dbType"]) - } - - if _, exist := tags["httpMethod"]; exist { - ext.HTTPMethod.Set(sp, tags["httpMethod"]) - } - - if _, exist := tags["dbUser"]; exist { - ext.DBUser.Set(sp, tags["dbUser"]) - } - - if _, exist := tags["messageBusDestination"]; exist { - ext.MessageBusDestination.Set(sp, tags["messageBusDestination"]) - } - - //注入span (用于传递) - if err := opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(header)); err != nil { - logger.Fatalln("inject failed", err) - } - //关闭连接 - defer sp.Finish() - //返回带有 span 的 context - return opentracing.ContextWithSpan(ctx, sp) -} - -//RPC Client Dial Option -func ClientDialOption(parentTracer opentracing.Tracer) grpc.DialOption { - tracer = parentTracer - return grpc.WithUnaryInterceptor(grpcClientInterceptor) -} - -//text map writer -type TextMapWriter struct { - metadata.MD -} - -//text map writer set -func (t TextMapWriter) Set(key, val string) { - t.MD[key] = append(t.MD[key], val) -} - -//RPC Client 拦截器 -func grpcClientInterceptor( - ctx context.Context, - method string, - req, reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption) (err error) { - - //从context中获取metadata - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - md = metadata.New(nil) - } else { - //如果对metadata进行修改,那么需要用拷贝的副本进行修改 - md = md.Copy() - } - //carrier := opentracing.TextMapCarrier{} - carrier := TextMapWriter{md} - //父类 context - var currentContext opentracing.SpanContext - //从 context 中获取原始的 span - parentSpan := opentracing.SpanFromContext(ctx) - if parentSpan != nil { - currentContext = parentSpan.Context() - } else { - //否则创建 span - span := tracer.StartSpan(method) - defer span.Finish() - currentContext = span.Context() - } - //将 span 的 context 信息注入到 carrier 中 - e := tracer.Inject(currentContext, opentracing.TextMap, carrier) - if e != nil { - logger.Fatalln("tracer inject failed", e) - } - //创建一个新的 context,把 metadata 附带上 - ctx = metadata.NewOutgoingContext(ctx, md) - return invoker(ctx, method, req, reply, cc, opts...) -} - -//RPC Server Dial Option -func ServerDialOption(tracer opentracing.Tracer) grpc.ServerOption { - return grpc.UnaryInterceptor(jaegerGrpcServerInterceptor) -} - -//读取metadata中的span信息 -func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { //不能是指针 - for key, val := range t.MD { - for _, v := range val { - if err := handler(key, v); err != nil { - return err - } - } - } - return nil -} - -//RPC Server 拦截器 -func jaegerGrpcServerInterceptor( - ctx context.Context, - req interface{}, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler) (resp interface{}, err error) { - //从context中获取metadata。md.(type) == map[string][]string - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - md = metadata.New(nil) - } else { - //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释) - md = md.Copy() - } - carrier := TextMapReader{md} - tracer := opentracing.GlobalTracer() - spanContext, e := tracer.Extract(opentracing.TextMap, carrier) - if e != nil { - logger.Fatalln("extract span context err", e) - } - - span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext)) - defer span.Finish() - ctx = opentracing.ContextWithSpan(ctx, span) - - return handler(ctx, req) -} +// func WriteSubSpan(span opentracing.Span, subSpanName string) { +// //use context +// ctx := context.Background() +// ctx = opentracing.ContextWithSpan(ctx, span) +// //其他过程获取并开始子 span +// newSpan, _ := opentracing.StartSpanFromContext(ctx, subSpanName) +// //StartSpanFromContext 会将新span保存到ctx中更新 +// defer newSpan.Finish() +// } diff --git a/tracer/zipkin.go b/tracer/zipkin.go index da82083..cc95605 100644 --- a/tracer/zipkin.go +++ b/tracer/zipkin.go @@ -1,7 +1,9 @@ package traceandtracego import ( - logger "log" + "log" + "os" + "strconv" opentracing "github.com/opentracing/opentracing-go" zipkinot "github.com/openzipkin-contrib/zipkin-go-opentracing" @@ -12,21 +14,43 @@ import ( //初始化 zipkin func InitZipkin(service string) (opentracing.Tracer, reporter.Reporter) { + // setting trace host + zipkinHost := "http://localhost:9411/api/v2/spans" + if traceAgentHost := os.Getenv("TRACE_AGENT_HOST"); traceAgentHost != "" { + zipkinHost = traceAgentHost + } else if traceCollectorEndpoint := os.Getenv("TRACE_ENDPOINT"); traceCollectorEndpoint != "" { + zipkinHost = traceCollectorEndpoint + } //设置 span reporter - reporter := httpreporter.NewReporter("http://localhost:9411/api/v2/spans") + reporter := httpreporter.NewReporter(zipkinHost) //创建本地 service 节点 endpoint, err := zipkin.NewEndpoint(service, "") //log error if err != nil { - logger.Println("[traceandtrace] [Error] unable to create local endpoint ...", err) + log.Println("unable to create local endpoint ...", err) + } + + var sampleParam uint64 = 1 + // setting trace sampler type param + if traceSamplerParam := os.Getenv("TRACE_SAMPLER_PARAM"); traceSamplerParam != "" { + sampleParam, _ = strconv.ParseUint(traceSamplerParam, 10, 64) } + //setting sampler + sampler := zipkin.NewModuloSampler(sampleParam) + //初始化 tracer - nativeTracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint)) + nativeTracer, err := zipkin.NewTracer( + reporter, + zipkin.WithLocalEndpoint(endpoint), + zipkin.WithSampler(sampler), + ) if err != nil { - logger.Println("[traceandtrace] [Error] unable to create tracer ...", err) + log.Println("unable to create tracer ...", err) } + //wrap tracer tracer := zipkinot.Wrap(nativeTracer) + opentracing.SetGlobalTracer(tracer) //返回 trace & reporter return tracer, reporter } diff --git a/wiki/imgs/goroutine.png b/wiki/imgs/goroutine.png new file mode 100644 index 0000000..4580d0d Binary files /dev/null and b/wiki/imgs/goroutine.png differ diff --git a/wiki/imgs/http_client_2.jpg b/wiki/imgs/http_client_2.jpg new file mode 100644 index 0000000..e91cb15 Binary files /dev/null and b/wiki/imgs/http_client_2.jpg differ diff --git a/wiki/imgs/httptogrpc_client.jpg b/wiki/imgs/httptogrpc_client.jpg index 7b747f5..d79de68 100644 Binary files a/wiki/imgs/httptogrpc_client.jpg and b/wiki/imgs/httptogrpc_client.jpg differ diff --git a/wrapper.go b/wrapper.go index 01bd928..d77c1d7 100644 --- a/wrapper.go +++ b/wrapper.go @@ -4,6 +4,7 @@ import ( "context" "io" "net/http" + "os" "strings" tracing "github.com/codeandcode0x/traceandtrace-go/tracer" @@ -19,37 +20,38 @@ const ( //Add http tracing , tags is k-v map which can set in span log, param map can set trace type . func AddHttpTracing(svcName, spanName string, header http.Header, tags map[string]string, param ...map[string]string) (context.Context, context.CancelFunc) { - // 定义 trace type + //trace type var traceType string - //启动 trace 任务 + //start trace task ctx, cancel := context.WithCancel(context.Background()) - //创建通道 + //create chan ch := make(chan context.Context, 0) - //选择类型和服务 + //trace type traceType = JAEGER_TRACER - if len(param) > 0 { - if _, exist := param[0]["traceType"]; exist { - traceType = strings.ToLower(param[0]["traceType"]) - } + //get trace type env + if tType := os.Getenv("TRACE_TYPE"); tType != "" { + traceType = tType + } else if _, exist := tags["traceType"]; exist { + traceType = strings.ToLower(tags["traceType"]) } - //创建任务 + //create goroutine job go GenerateTracingJobs(ch, ctx, svcName, spanName, header, tags, traceType) - //返回通道 + //return chan return <-ch, cancel } //add rpc client tracing func AddRpcClientTracing(serviceName string, param ...map[string]string) (grpc.DialOption, io.Closer) { - //初始化 jaeger - tracer, closer := tracing.InitJaeger(serviceName) - //返回 rpc options + //init tracer + tracer, closer := SelectInitTracer(serviceName, param...) + //return rpc options return tracing.ClientDialOption(tracer), closer } //add rpc server tracing func AddRpcServerTracing(serviceName string, param ...map[string]string) (grpc.ServerOption, io.Closer, opentracing.Tracer) { - //初始化 jaeger - tracer, closer := tracing.InitJaeger(serviceName) - //返回 rpc options + //init jaeger + tracer, closer := SelectInitTracer(serviceName, param...) + //return rpc options return tracing.ServerDialOption(tracer), closer, tracer }