diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml index 40801e52..107e83cf 100644 --- a/.github/workflows/plugin-tests.yaml +++ b/.github/workflows/plugin-tests.yaml @@ -80,7 +80,7 @@ jobs: - go-redisv9 - go-restfulv3 - gorm - - kratosv2 +# - kratosv2 temporary disable because it's not stable - microv4 - mongo - mysql diff --git a/CHANGES.md b/CHANGES.md index 29b9108b..ba085553 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. * Support Windows plugin test. * Support Kafka reporter. * Add recover to goroutine to prevent unexpected panics. +* Add mutex to fix some data race. #### Plugins diff --git a/plugins/core/context.go b/plugins/core/context.go index fe1c561d..9721c279 100644 --- a/plugins/core/context.go +++ b/plugins/core/context.go @@ -17,7 +17,10 @@ package core -import "reflect" +import ( + "reflect" + "sync" +) var ( GetGLS = func() interface{} { return nil } @@ -29,16 +32,21 @@ var ( ) type ContextSnapshoter interface { - TakeSnapShot(val interface{}) interface{} + TakeSnapShot() interface{} } type TracingContext struct { activeSpan TracingSpan Runtime *RuntimeContext ID *IDContext + + activeSpanLock sync.RWMutex } -func (t *TracingContext) TakeSnapShot(val interface{}) interface{} { +func (t *TracingContext) TakeSnapShot() interface{} { + if t == nil { + return nil + } snapshot := newSnapshotSpan(t.ActiveSpan()) return &TracingContext{ activeSpan: snapshot, @@ -48,6 +56,8 @@ func (t *TracingContext) TakeSnapShot(val interface{}) interface{} { } func (t *TracingContext) ActiveSpan() TracingSpan { + t.activeSpanLock.RLock() + defer t.activeSpanLock.RUnlock() if t.activeSpan == nil || reflect.ValueOf(t.activeSpan).IsZero() { return nil } @@ -55,6 +65,8 @@ func (t *TracingContext) ActiveSpan() TracingSpan { } func (t *TracingContext) SaveActiveSpan(s TracingSpan) { + t.activeSpanLock.Lock() + defer t.activeSpanLock.Unlock() t.activeSpan = s } diff --git a/plugins/core/metrics.go b/plugins/core/metrics.go index 5e0fd8a5..ddac85c3 100644 --- a/plugins/core/metrics.go +++ b/plugins/core/metrics.go @@ -75,7 +75,7 @@ func (t *Tracer) reachNotInitMetrics() { func (t *Tracer) sendMetrics() { meters := make([]reporter.ReportedMeter, 0) // call collect hook - for _, hook := range t.meterCollectListeners { + for _, hook := range t.allMeterCollectListeners() { hook() } t.meterMap.Range(func(key, value interface{}) bool { @@ -90,6 +90,14 @@ func (t *Tracer) sendMetrics() { t.Reporter.SendMetrics(meters) } +func (t *Tracer) allMeterCollectListeners() []func() { + t.meterCollectListenersLock.RLock() + defer t.meterCollectListenersLock.RUnlock() + listeners := make([]func(), 0, len(t.meterCollectListeners)) + listeners = append(listeners, t.meterCollectListeners...) + return listeners +} + func (t *Tracer) NewCounter(name string, opt interface{}) interface{} { counter := newCounter(name, nil, 0) if o, ok := opt.(meterOpts); ok && o != nil { @@ -118,6 +126,8 @@ func (t *Tracer) NewHistogram(name string, minValue float64, steps []float64, op } func (t *Tracer) AddCollectHook(f func()) { + t.meterCollectListenersLock.Lock() + defer t.meterCollectListenersLock.Unlock() t.meterCollectListeners = append(t.meterCollectListeners, f) } @@ -347,7 +357,7 @@ func (h *histogramBucket) Bucket() float64 { } func (h *histogramBucket) Count() int64 { - return *h.value + return atomic.LoadInt64(h.value) } func (h *histogramBucket) IsNegativeInfinity() bool { diff --git a/plugins/core/operator/invocation.go b/plugins/core/operator/invocation.go index 8241ca81..83fee062 100644 --- a/plugins/core/operator/invocation.go +++ b/plugins/core/operator/invocation.go @@ -27,6 +27,10 @@ type realInvocation struct { returnValues []interface{} context interface{} + + // self obs + interTimeCost int64 + beforeInterStart int64 } func (i *realInvocation) CallerInstance() interface{} { diff --git a/plugins/core/sampler.go b/plugins/core/sampler.go index c9fb6ba9..282f288c 100644 --- a/plugins/core/sampler.go +++ b/plugins/core/sampler.go @@ -107,10 +107,13 @@ type DynamicSampler struct { currentRate float64 defaultRate float64 sampler Sampler + locker sync.RWMutex } // IsSampled implements IsSampled() of Sampler. func (s *DynamicSampler) IsSampled(operation string) bool { + s.locker.RLock() + defer s.locker.RUnlock() return s.sampler.IsSampled(operation) } @@ -136,11 +139,15 @@ func (s *DynamicSampler) Notify(eventType reporter.AgentConfigEventType, newValu } else { sampler = NewRandomSampler(samplingRate) } + s.locker.Lock() + defer s.locker.Unlock() s.sampler = sampler s.currentRate = samplingRate } func (s *DynamicSampler) Value() string { + s.locker.RLock() + defer s.locker.RUnlock() return fmt.Sprintf("%f", s.currentRate) } diff --git a/plugins/core/span_tracing.go b/plugins/core/span_tracing.go index 4a4b6aca..87edd7cd 100644 --- a/plugins/core/span_tracing.go +++ b/plugins/core/span_tracing.go @@ -255,12 +255,12 @@ func (rs *RootSegmentSpan) AsyncFinish() { } func (rs *RootSegmentSpan) end0() { - go func() { - defer func() { - _ = recover() - }() - rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1) + defer func() { + _ = recover() }() + if rs != nil && rs.doneCh != nil && rs.SegmentContext.refNum != nil { + rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1) + } } func (rs *RootSegmentSpan) createRootSegmentContext(ctx *TracingContext, _ SegmentSpan) (err error) { diff --git a/plugins/core/test_base.go b/plugins/core/test_base.go index 5fa2ae3d..749a6533 100644 --- a/plugins/core/test_base.go +++ b/plugins/core/test_base.go @@ -57,7 +57,7 @@ func SetAsNewGoroutine() { return } if e := gls.(ContextSnapshoter); e != nil { - SetGLS(e.TakeSnapShot(GetGLS())) + SetGLS(e.TakeSnapShot()) } } diff --git a/plugins/core/tracer.go b/plugins/core/tracer.go index 184f5c84..3090502c 100644 --- a/plugins/core/tracer.go +++ b/plugins/core/tracer.go @@ -49,11 +49,12 @@ type Tracer struct { // for plugin tools tools *TracerTools // for all metrics - meterMap *sync.Map - meterCollectListeners []func() - ignoreSuffix []string - traceIgnorePath []string - mu sync.Mutex + meterMap *sync.Map + meterCollectListeners []func() + meterCollectListenersLock sync.RWMutex + ignoreSuffix []string + traceIgnorePath []string + mu sync.Mutex } func (t *Tracer) Init(entity *reporter.Entity, rep reporter.Reporter, samp Sampler, logger operator.LogOperator, diff --git a/plugins/core/tracing.go b/plugins/core/tracing.go index 789682ce..65f85615 100644 --- a/plugins/core/tracing.go +++ b/plugins/core/tracing.go @@ -172,6 +172,8 @@ func (t *Tracer) ContinueContext(snapshot interface{}) { ctx = NewTracingContext() SetGLS(ctx) } + ctx.activeSpanLock.Lock() + defer ctx.activeSpanLock.Unlock() ctx.activeSpan = snap.activeSpan ctx.Runtime = snap.runtime } diff --git a/tools/go-agent/instrument/plugins/enhance_method.go b/tools/go-agent/instrument/plugins/enhance_method.go index b6d918c5..fe448483 100644 --- a/tools/go-agent/instrument/plugins/enhance_method.go +++ b/tools/go-agent/instrument/plugins/enhance_method.go @@ -153,8 +153,6 @@ func (m *MethodEnhance) BuildForDelegator() []dst.Decl { result := make([]dst.Decl, 0) result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s = &%s{}`, m.InterceptorVarName, m.InterceptorGeneratedName))...) - result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s_interTimeCost int64`, m.FuncID))...) - result = append(result, tools.GoStringToDecls(fmt.Sprintf(`var %s_beforeInterStart int64`, m.FuncID))...) preFunc := &dst.FuncDecl{ Name: &dst.Ident{Name: m.AdapterPreFuncName}, Type: &dst.FuncType{ diff --git a/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl b/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl index fefe85e4..26238e85 100644 --- a/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl +++ b/tools/go-agent/instrument/plugins/templates/method_intercept_after.tmpl @@ -4,8 +4,8 @@ defer func() { // log error log.Errorf("execute interceptor after invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v, stack: %s", "{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", r, tracing.DebugStack()) - {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart - operator.DurationOfInterceptor({{.FuncID}}_interTimeCost) + invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart + operator.DurationOfInterceptor(invocation.interTimeCost) } }() @@ -28,5 +28,5 @@ if (invocation.isContinue) { } {{- end }} } -{{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart -operator.DurationOfInterceptor({{.FuncID}}_interTimeCost) \ No newline at end of file +invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart +operator.DurationOfInterceptor(invocation.interTimeCost) \ No newline at end of file diff --git a/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl b/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl index 6398dccc..a2c17dd9 100644 --- a/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl +++ b/tools/go-agent/instrument/plugins/templates/method_intercept_before.tmpl @@ -1,15 +1,15 @@ -{{.FuncID}}_interTimeCost = int64(0) -{{.FuncID}}_beforeInterStart = operator.NanoTime() +invocation = &operator.realInvocation{} +invocation.interTimeCost = int64(0) +invocation.beforeInterStart = operator.NanoTime() defer func() { if err := recover(); err != nil { operator.ErrorOfPlugin("{{.InstrumentName}}") // log error log.Errorf("execute interceptor before invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v, stack: %s", "{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", err, tracing.DebugStack()) - {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart + invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart } }() -invocation = &operator.realInvocation{} {{if .Recvs -}} invocation.callerInstance = *recv_0 // for caller if exist {{- end}} @@ -37,7 +37,7 @@ if err := {{.InterceptorVarName}}.BeforeInvoke(invocation); err != nil { // using go2sky log error log.Warnf("execute interceptor before invoke error, instrument name: %s, interceptor name: %s, function ID: %s, error: %v", "{{.InstrumentName}}", "{{.InterceptorDefineName}}", "{{.FuncID}}", err) - {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart + invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart return {{ range $index, $value := .Results -}} def_res_{{$index}}, @@ -49,13 +49,13 @@ if (invocation.isContinue) { def_res_{{$index}} = (invocation.returnValues[{{$index}}]).({{$value.PackagedTypeName}}) } {{- end}} - {{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart + invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart return {{ range $index, $value := .Results -}} def_res_{{$index}}, {{- end}}invocation, true } -{{.FuncID}}_interTimeCost += operator.NanoTime() - {{.FuncID}}_beforeInterStart +invocation.interTimeCost += operator.NanoTime() - invocation.beforeInterStart return {{ range $index, $value := .Results -}} def_res_{{$index}}, diff --git a/tools/go-agent/instrument/runtime/instrument.go b/tools/go-agent/instrument/runtime/instrument.go index 6f98d2a6..878c8495 100644 --- a/tools/go-agent/instrument/runtime/instrument.go +++ b/tools/go-agent/instrument/runtime/instrument.go @@ -253,15 +253,15 @@ func _skywalking_metrics_hook_append_impl(f func()) { } type ContextSnapshoter interface { - TakeSnapShot(val interface{}) interface{} + TakeSnapShot() interface{} } func goroutineChange(tls interface{}) interface{} { if tls == nil { return nil } - if taker, ok := tls.(ContextSnapshoter); ok { - return taker.TakeSnapShot(tls) + if taker, ok := tls.(ContextSnapshoter); ok && taker != nil { + return taker.TakeSnapShot() } return tls }