diff --git a/profiler/internal/pprofutils/delta.go b/profiler/internal/pprofutils/delta.go index 263668bd9c..f81fa169aa 100644 --- a/profiler/internal/pprofutils/delta.go +++ b/profiler/internal/pprofutils/delta.go @@ -17,8 +17,6 @@ type Delta struct { // SampleTypes limits the delta calcultion to the given sample types. Other // sample types will retain the values of profile b. The defined sample types // must exist in the profile, otherwise derivation will fail with an error. - // If the slice is empty, all sample types are subject to delta profile - // derivation. // // The use case for this for this is to deal with the heap profile which // contains alloc and inuse sample types, but delta profiling makes no sense @@ -30,21 +28,12 @@ type Delta struct { // profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile // a will be mutated by this function. You should pass a copy if that's // undesirable. -// -// Other profiles that should be merged into the resulting profile can be passed -// through the extra parameter. -func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) { +func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { ratios := make([]float64, len(a.SampleType)) found := 0 for i, st := range a.SampleType { - // Empty c.SampleTypes means we calculate the delta for every st - if len(d.SampleTypes) == 0 { - ratios[i] = -1 - continue - } - - // Otherwise we only calcuate the delta for any st that is listed in + // We only calcuate the delta for any st that is listed in // c.SampleTypes. st's not listed in there will default to ratio 0, which // means we delete them from pa, so only the pb values remain in the final // profile. @@ -61,10 +50,7 @@ func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profi a.ScaleN(ratios) - profiles := make([]*profile.Profile, 0, 2+len(extra)) - profiles = append(profiles, a, b) - profiles = append(profiles, extra...) - delta, err := profile.Merge(profiles) + delta, err := profile.Merge([]*profile.Profile{a, b}) if err != nil { return nil, err } diff --git a/profiler/internal/pprofutils/delta_test.go b/profiler/internal/pprofutils/delta_test.go index c922aa2847..9fdc588718 100644 --- a/profiler/internal/pprofutils/delta_test.go +++ b/profiler/internal/pprofutils/delta_test.go @@ -18,6 +18,7 @@ func TestDelta(t *testing.T) { var deltaText bytes.Buffer profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count main;foo 5 main;foo;bar 3 main;foobar 4 @@ -25,13 +26,14 @@ main;foobar 4 require.NoError(t, err) profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count main;foo 8 main;foo;bar 3 main;foobar 5 `))) require.NoError(t, err) - delta, err := Delta{}.Convert(profA, profB) + delta, err := Delta{SampleTypes: []ValueType{{Type: "x", Unit: "count"}}}.Convert(profA, profB) require.NoError(t, err) require.NoError(t, Protobuf{}.Convert(delta, &deltaText)) diff --git a/profiler/profile.go b/profiler/profile.go index c939569af7..424753e1c0 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -64,12 +64,13 @@ type profileType struct { // this isn't done due to idiosyncratic filename used by the // GoroutineProfile. Filename string - // SupportsDelta indicates whether delta profiles can be computed for - // this profile type, which is used to determine the final filename - SupportsDelta bool // Collect collects the given profile and returns the data for it. Most // profiles will be in pprof format, i.e. gzip compressed proto buf data. Collect func(p *profiler) ([]byte, error) + // DeltaValues identifies which values in profile samples should be modified + // when delta profiling is enabled. Empty DeltaValues means delta profiling is + // not supported for this profile type + DeltaValues []pprofutils.ValueType } // profileTypes maps every ProfileType to its implementation. @@ -109,28 +110,34 @@ var profileTypes = map[ProfileType]profileType{ HeapProfile: { Name: "heap", Filename: "heap.pprof", - Collect: collectGenericProfile("heap", HeapProfile, &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ + Collect: collectGenericProfile("heap", HeapProfile), + DeltaValues: []pprofutils.ValueType{ {Type: "alloc_objects", Unit: "count"}, {Type: "alloc_space", Unit: "bytes"}, - }}), - SupportsDelta: true, + }, }, MutexProfile: { - Name: "mutex", - Filename: "mutex.pprof", - Collect: collectGenericProfile("mutex", MutexProfile, &pprofutils.Delta{}), - SupportsDelta: true, + Name: "mutex", + Filename: "mutex.pprof", + Collect: collectGenericProfile("mutex", MutexProfile), + DeltaValues: []pprofutils.ValueType{ + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, }, BlockProfile: { - Name: "block", - Filename: "block.pprof", - Collect: collectGenericProfile("block", BlockProfile, &pprofutils.Delta{}), - SupportsDelta: true, + Name: "block", + Filename: "block.pprof", + Collect: collectGenericProfile("block", BlockProfile), + DeltaValues: []pprofutils.ValueType{ + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, }, GoroutineProfile: { Name: "goroutine", Filename: "goroutines.pprof", - Collect: collectGenericProfile("goroutine", GoroutineProfile, nil), + Collect: collectGenericProfile("goroutine", GoroutineProfile), }, expGoroutineWaitProfile: { Name: "goroutinewait", @@ -166,9 +173,9 @@ var profileTypes = map[ProfileType]profileType{ }, } -func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) { +func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byte, error) { return func(p *profiler) ([]byte, error) { - var extra []*pprofile.Profile + var extra *pprofile.Profile cAlloc, ok := extensions.GetCAllocationProfiler() switch { case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && pt == HeapProfile: @@ -183,7 +190,7 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) p.interruptibleSleep(p.cfg.period) profile, err := cAlloc.Stop() if err == nil { - extra = append(extra, profile) + extra = profile } default: // In all cases, sleep until the end of the profile @@ -195,18 +202,34 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) var buf bytes.Buffer err := p.lookupProfile(name, &buf, 0) data := buf.Bytes() - if delta == nil || !p.cfg.deltaProfiles { + dp, ok := p.deltas[pt] + if !ok || !p.cfg.deltaProfiles { return data, err } start := time.Now() - delta, err := p.deltaProfile(name, delta, data, extra...) + delta, err := dp.Delta(data) + if err == nil && extra != nil { + extended, err := pprofile.ParseData(delta) + if err != nil { + return nil, err + } + extended, err = pprofile.Merge([]*pprofile.Profile{extended, extra}) + if err != nil { + return nil, err + } + buf.Reset() + if err := extended.Write(&buf); err != nil { + return nil, err + } + delta = buf.Bytes() + } tags := append(p.cfg.tags.Slice(), fmt.Sprintf("profile_type:%s", name)) p.cfg.statsd.Timing("datadog.profiling.go.delta_time", time.Since(start), tags, 1) if err != nil { return nil, fmt.Errorf("delta profile error: %s", err) } - return delta.data, err + return delta, err } } @@ -273,25 +296,37 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { tags := append(p.cfg.tags.Slice(), pt.Tag()) filename := t.Filename // TODO(fg): Consider making Collect() return the filename. - if p.cfg.deltaProfiles && t.SupportsDelta { + if p.cfg.deltaProfiles && len(t.DeltaValues) > 0 { filename = "delta-" + filename } p.cfg.statsd.Timing("datadog.profiling.go.collect_time", end.Sub(start), tags, 1) return []*profile{{name: filename, data: data}}, nil } -// deltaProfile derives the delta profile between curData and the previous -// profile. If extra profiles are provided, they will be merged into the final -// profile after computing the delta profile. -func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) { +type deltaProfiler struct { + delta pprofutils.Delta + prev *pprofile.Profile +} + +// newDeltaProfiler returns an initialized deltaProfiler. If value types +// are given (e.g. "alloc_space", "alloc_objects"), only those values will have +// deltas computed. Otherwise, deltas will be computed for every value. +func newDeltaProfiler(v ...pprofutils.ValueType) *deltaProfiler { + return &deltaProfiler{ + delta: pprofutils.Delta{SampleTypes: v}, + } +} + +// Delta derives the delta profile between curData and the profile passed to the +// previous call to Delta. The first call to Delta will return the profile +// unchanged. +func (d *deltaProfiler) Delta(curData []byte) ([]byte, error) { curProf, err := pprofile.ParseData(curData) if err != nil { return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - p.mu.Lock() - prevProf := p.prev[name] - p.mu.Unlock() + prevProf := d.prev if prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. @@ -301,7 +336,7 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] // Unfortunately the core implementation isn't resuable via a API, so we do // our own delta calculation below. // https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304 - deltaProf, err := delta.Convert(prevProf, curProf, extra...) + deltaProf, err := d.delta.Convert(prevProf, curProf) if err != nil { return nil, fmt.Errorf("delta prof merge: %v", err) } @@ -318,10 +353,8 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. - p.mu.Lock() - p.prev[name] = curProf - p.mu.Unlock() - return &profile{data: deltaData}, nil + d.prev = curProf + return deltaData, nil } func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) { diff --git a/profiler/profile_test.go b/profiler/profile_test.go index abe2caf25e..ba25f82f9d 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -43,29 +43,29 @@ func TestRunProfile(t *testing.T) { Prof1: textProfile{ Time: timeA, Text: ` -stuff/count -main 3 -main;bar 2 -main;foo 5 +contentions/count delay/nanoseconds +main 3 1 +main;bar 2 1 +main;foo 5 1 `, }, Prof2: textProfile{ Time: timeB, Text: ` -stuff/count -main 4 -main;bar 2 -main;foo 8 -main;foobar 7 +contentions/count delay/nanoseconds +main 4 1 +main;bar 2 1 +main;foo 8 1 +main;foobar 7 1 `, }, WantDelta: textProfile{ Time: timeA, Text: ` -stuff/count -main;foobar 7 -main;foo 3 -main 1 +contentions/count delay/nanoseconds +main;foobar 7 1 +main;foo 3 0 +main 1 0 `, }, WantDuration: deltaPeriod, @@ -115,7 +115,7 @@ main;bar 0 0 8 16 // followed by prof2 when calling runProfile(). deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) { returnProfs := [][]byte{prof1, prof2} - opts = append(opts, WithPeriod(5*time.Millisecond)) + opts = append(opts, WithPeriod(5*time.Millisecond), WithProfileTypes(HeapProfile, MutexProfile, BlockProfile)) p, err := unstartedProfiler(opts...) p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error { _, err := w.Write(returnProfs[0]) diff --git a/profiler/profiler.go b/profiler/profiler.go index a9d6adab42..bb0610216e 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -20,8 +20,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - pprofile "github.com/google/pprof/profile" ) // outChannelSize specifies the size of the profile output channel. @@ -64,15 +62,14 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { - mu sync.Mutex - cfg *config // profile configuration - out chan batch // upload queue - uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests - exit chan struct{} // exit signals the profiler to stop; it is closed after stopping - stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. - wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. - met *metrics // metric collector state - prev map[string]*pprofile.Profile // previous collection results for delta profiling + cfg *config // profile configuration + out chan batch // upload queue + uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests + exit chan struct{} // exit signals the profiler to stop; it is closed after stopping + stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. + wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. + met *metrics // metric collector state + deltas map[ProfileType]*deltaProfiler telemetry *telemetry.Client seq uint64 // seq is the value of the profile_seq tag pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling @@ -182,11 +179,16 @@ func newProfiler(opts ...Option) (*profiler, error) { } p := profiler{ - cfg: cfg, - out: make(chan batch, outChannelSize), - exit: make(chan struct{}), - met: newMetrics(), - prev: make(map[string]*pprofile.Profile), + cfg: cfg, + out: make(chan batch, outChannelSize), + exit: make(chan struct{}), + met: newMetrics(), + deltas: make(map[ProfileType]*deltaProfiler), + } + for pt := range cfg.types { + if d := profileTypes[pt].DeltaValues; len(d) > 0 { + p.deltas[pt] = newDeltaProfiler(d...) + } } p.uploadFunc = p.upload p.telemetry = &telemetry.Client{