Skip to content

Commit

Permalink
feature: enable grpc tracing (#531)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Aug 11, 2021
1 parent e464523 commit c1c3d65
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 55 deletions.
7 changes: 5 additions & 2 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
Expand Down Expand Up @@ -86,8 +87,8 @@ func newFilePeerTask(ctx context.Context,

logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag, request.PeerId)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()
Expand All @@ -104,8 +105,10 @@ func newFilePeerTask(ctx context.Context,
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}

if result == nil {
defer span.End()
span.RecordError(err)
Expand Down
12 changes: 5 additions & 7 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand All @@ -27,7 +26,6 @@ import (
)

type filePeerTaskCallback struct {
ctx context.Context
ptm *peerTaskManager
pt *filePeerTask
req *FilePeerTaskRequest
Expand All @@ -42,7 +40,7 @@ func (p *filePeerTaskCallback) GetStartTime() time.Time {

func (p *filePeerTaskCallback) Init(pt Task) error {
// prepare storage
err := p.ptm.storageManager.RegisterTask(p.ctx,
err := p.ptm.storageManager.RegisterTask(p.pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -60,7 +58,7 @@ func (p *filePeerTaskCallback) Init(pt Task) error {

func (p *filePeerTaskCallback) Update(pt Task) error {
// update storage
err := p.ptm.storageManager.UpdateTask(p.ctx,
err := p.ptm.storageManager.UpdateTask(p.pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: pt.GetPeerID(),
Expand All @@ -79,7 +77,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
var cost = time.Now().Sub(p.start).Milliseconds()
pt.Log().Infof("file peer task done, cost: %dms", cost)
e := p.ptm.storageManager.Store(
context.Background(),
p.pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -93,7 +91,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -118,7 +116,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
pt.needBackSource = true

pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down Expand Up @@ -252,7 +251,6 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
pt.needBackSource = true

pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
return nil, tiny, nil
}
pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down Expand Up @@ -224,7 +223,6 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu

pt.SetCallback(
&streamPeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down
7 changes: 4 additions & 3 deletions client/daemon/peer/peertask_reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"io"
"time"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
Expand Down
7 changes: 5 additions & 2 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
Expand Down Expand Up @@ -69,8 +70,8 @@ func newStreamPeerTask(ctx context.Context,
logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, tag: %s",
request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()
Expand All @@ -87,8 +88,10 @@ func newStreamPeerTask(ctx context.Context,
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}

if result == nil {
defer span.End()
span.RecordError(err)
Expand Down
12 changes: 5 additions & 7 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand All @@ -27,7 +26,6 @@ import (
)

type streamPeerTaskCallback struct {
ctx context.Context
ptm *peerTaskManager
pt *streamPeerTask
req *scheduler.PeerTaskRequest
Expand All @@ -42,7 +40,7 @@ func (p *streamPeerTaskCallback) GetStartTime() time.Time {

func (p *streamPeerTaskCallback) Init(pt Task) error {
// prepare storage
err := p.ptm.storageManager.RegisterTask(p.ctx,
err := p.ptm.storageManager.RegisterTask(p.pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -59,7 +57,7 @@ func (p *streamPeerTaskCallback) Init(pt Task) error {

func (p *streamPeerTaskCallback) Update(pt Task) error {
// update storage
err := p.ptm.storageManager.UpdateTask(p.ctx,
err := p.ptm.storageManager.UpdateTask(p.pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: pt.GetPeerID(),
Expand All @@ -78,7 +76,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
var cost = time.Now().Sub(p.start).Milliseconds()
pt.Log().Infof("stream peer task done, cost: %dms", cost)
e := p.ptm.storageManager.Store(
context.Background(),
p.pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -91,7 +89,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -116,7 +114,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) {
0)
assert.Nil(err, "new stream peer task")
pt.SetCallback(&streamPeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down Expand Up @@ -237,7 +236,6 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) {
0)
assert.Nil(err, "new stream peer task")
pt.SetCallback(&streamPeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down
13 changes: 7 additions & 6 deletions client/daemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ import (
"sync"
"time"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/transport"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
"github.com/go-http-utils/headers"
"github.com/golang/groupcache/lru"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/transport"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

var (
Expand Down
21 changes: 12 additions & 9 deletions cmd/dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ import (
"syscall"
"time"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dflog/logcore"
"d7y.io/dragonfly/v2/internal/dfpath"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/unit"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
"d7y.io/dragonfly/v2/version"
"github.com/go-echarts/statsview"
"github.com/go-echarts/statsview/viewer"
"github.com/mitchellh/mapstructure"
Expand All @@ -45,11 +36,22 @@ import (
"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dflog/logcore"
"d7y.io/dragonfly/v2/internal/dfpath"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/unit"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
"d7y.io/dragonfly/v2/version"
)

// InitCobra initializes flags binding and common sub cmds.
Expand Down Expand Up @@ -245,6 +247,7 @@ func initJaegerTracer(url string) (func(), error) {
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

return func() {
// Do not make the application hang when it is shutdown.
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.7.3
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/jarcoal/httpmock v1.0.8
github.com/klauspost/compress v1.13.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand All @@ -55,6 +56,7 @@ require (
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
github.com/ugorji/go v1.1.13 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
Expand All @@ -67,7 +69,7 @@ require (
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/tools v0.1.4 // indirect
gonum.org/v1/gonum v0.9.3
google.golang.org/grpc v1.36.0
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down

0 comments on commit c1c3d65

Please sign in to comment.