From e32e8ee5d012212a173fe71ba7d6d194a3e71108 Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Tue, 27 Sep 2022 12:04:45 -0400 Subject: [PATCH 1/4] profiler: wrap delta profiling in a type Wrap up delta profiling in a type with a Delta method. This type holds any information about the previous profile needed to compute the delta with the newest profile. The profiler keeps an instance of type for each profile type which supports delta profiling. This better encapsulates the implementation details of delta profiling, and facilitates upcoming implementation changes. As part of this change, pull the logic for merging in "extra" profiles out of the delta profiling code path. This logic was implemented for C allocation profiling, and the extra data was passed through to delta profiling mainly for efficiency. However, merging in extra data is orthogonal to delta profiling and we can find other ways to make that more efficient if we need to. --- profiler/internal/pprofutils/delta.go | 10 +--- profiler/profile.go | 86 ++++++++++++++++----------- profiler/profile_test.go | 2 +- profiler/profiler.go | 34 ++++++----- 4 files changed, 73 insertions(+), 59 deletions(-) diff --git a/profiler/internal/pprofutils/delta.go b/profiler/internal/pprofutils/delta.go index 263668bd9c..1326db36b7 100644 --- a/profiler/internal/pprofutils/delta.go +++ b/profiler/internal/pprofutils/delta.go @@ -30,10 +30,7 @@ type Delta struct { // profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile // a will be mutated by this function. You should pass a copy if that's // undesirable. -// -// Other profiles that should be merged into the resulting profile can be passed -// through the extra parameter. -func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) { +func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { ratios := make([]float64, len(a.SampleType)) found := 0 @@ -61,10 +58,7 @@ func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profi a.ScaleN(ratios) - profiles := make([]*profile.Profile, 0, 2+len(extra)) - profiles = append(profiles, a, b) - profiles = append(profiles, extra...) - delta, err := profile.Merge(profiles) + delta, err := profile.Merge([]*profile.Profile{a, b}) if err != nil { return nil, err } diff --git a/profiler/profile.go b/profiler/profile.go index c939569af7..703ec3f8b9 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -64,12 +64,13 @@ type profileType struct { // this isn't done due to idiosyncratic filename used by the // GoroutineProfile. Filename string - // SupportsDelta indicates whether delta profiles can be computed for - // this profile type, which is used to determine the final filename - SupportsDelta bool // Collect collects the given profile and returns the data for it. Most // profiles will be in pprof format, i.e. gzip compressed proto buf data. Collect func(p *profiler) ([]byte, error) + // Delta identifies which values in profile samples should be modified + // when delta profiling is enabled, if the profile type supports delta + // profiles + Delta *pprofutils.Delta } // profileTypes maps every ProfileType to its implementation. @@ -109,28 +110,28 @@ var profileTypes = map[ProfileType]profileType{ HeapProfile: { Name: "heap", Filename: "heap.pprof", - Collect: collectGenericProfile("heap", HeapProfile, &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ + Collect: collectGenericProfile("heap", HeapProfile), + Delta: &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ {Type: "alloc_objects", Unit: "count"}, {Type: "alloc_space", Unit: "bytes"}, - }}), - SupportsDelta: true, + }}, }, MutexProfile: { - Name: "mutex", - Filename: "mutex.pprof", - Collect: collectGenericProfile("mutex", MutexProfile, &pprofutils.Delta{}), - SupportsDelta: true, + Name: "mutex", + Filename: "mutex.pprof", + Collect: collectGenericProfile("mutex", MutexProfile), + Delta: &pprofutils.Delta{}, }, BlockProfile: { - Name: "block", - Filename: "block.pprof", - Collect: collectGenericProfile("block", BlockProfile, &pprofutils.Delta{}), - SupportsDelta: true, + Name: "block", + Filename: "block.pprof", + Collect: collectGenericProfile("block", BlockProfile), + Delta: &pprofutils.Delta{}, }, GoroutineProfile: { Name: "goroutine", Filename: "goroutines.pprof", - Collect: collectGenericProfile("goroutine", GoroutineProfile, nil), + Collect: collectGenericProfile("goroutine", GoroutineProfile), }, expGoroutineWaitProfile: { Name: "goroutinewait", @@ -166,9 +167,9 @@ var profileTypes = map[ProfileType]profileType{ }, } -func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) { +func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byte, error) { return func(p *profiler) ([]byte, error) { - var extra []*pprofile.Profile + var extra *pprofile.Profile cAlloc, ok := extensions.GetCAllocationProfiler() switch { case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && pt == HeapProfile: @@ -183,7 +184,7 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) p.interruptibleSleep(p.cfg.period) profile, err := cAlloc.Stop() if err == nil { - extra = append(extra, profile) + extra = profile } default: // In all cases, sleep until the end of the profile @@ -195,18 +196,34 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) var buf bytes.Buffer err := p.lookupProfile(name, &buf, 0) data := buf.Bytes() - if delta == nil || !p.cfg.deltaProfiles { + dp, ok := p.deltas[name] + if !ok || !p.cfg.deltaProfiles { return data, err } start := time.Now() - delta, err := p.deltaProfile(name, delta, data, extra...) + delta, err := dp.Delta(data) + if err == nil && extra != nil { + extended, err := pprofile.ParseData(delta) + if err != nil { + return nil, err + } + extended, err = pprofile.Merge([]*pprofile.Profile{extended, extra}) + if err != nil { + return nil, err + } + buf.Reset() + if err := extended.Write(&buf); err != nil { + return nil, err + } + delta = buf.Bytes() + } tags := append(p.cfg.tags.Slice(), fmt.Sprintf("profile_type:%s", name)) p.cfg.statsd.Timing("datadog.profiling.go.delta_time", time.Since(start), tags, 1) if err != nil { return nil, fmt.Errorf("delta profile error: %s", err) } - return delta.data, err + return delta, err } } @@ -273,25 +290,28 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { tags := append(p.cfg.tags.Slice(), pt.Tag()) filename := t.Filename // TODO(fg): Consider making Collect() return the filename. - if p.cfg.deltaProfiles && t.SupportsDelta { + if p.cfg.deltaProfiles && t.Delta != nil { filename = "delta-" + filename } p.cfg.statsd.Timing("datadog.profiling.go.collect_time", end.Sub(start), tags, 1) return []*profile{{name: filename, data: data}}, nil } -// deltaProfile derives the delta profile between curData and the previous -// profile. If extra profiles are provided, they will be merged into the final -// profile after computing the delta profile. -func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) { +type deltaProfiler struct { + delta *pprofutils.Delta + prev *pprofile.Profile +} + +// Delta derives the delta profile between curData and the profile passed to the +// previous call to Delta. The first call to Delta will return the profile +// unchanged. +func (d *deltaProfiler) Delta(curData []byte) ([]byte, error) { curProf, err := pprofile.ParseData(curData) if err != nil { return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - p.mu.Lock() - prevProf := p.prev[name] - p.mu.Unlock() + prevProf := d.prev if prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. @@ -301,7 +321,7 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] // Unfortunately the core implementation isn't resuable via a API, so we do // our own delta calculation below. // https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304 - deltaProf, err := delta.Convert(prevProf, curProf, extra...) + deltaProf, err := d.delta.Convert(prevProf, curProf) if err != nil { return nil, fmt.Errorf("delta prof merge: %v", err) } @@ -318,10 +338,8 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. - p.mu.Lock() - p.prev[name] = curProf - p.mu.Unlock() - return &profile{data: deltaData}, nil + d.prev = curProf + return deltaData, nil } func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) { diff --git a/profiler/profile_test.go b/profiler/profile_test.go index abe2caf25e..c9ca810a8f 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -115,7 +115,7 @@ main;bar 0 0 8 16 // followed by prof2 when calling runProfile(). deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) { returnProfs := [][]byte{prof1, prof2} - opts = append(opts, WithPeriod(5*time.Millisecond)) + opts = append(opts, WithPeriod(5*time.Millisecond), WithProfileTypes(HeapProfile, MutexProfile, BlockProfile)) p, err := unstartedProfiler(opts...) p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error { _, err := w.Write(returnProfs[0]) diff --git a/profiler/profiler.go b/profiler/profiler.go index a9d6adab42..6004cd67d2 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -20,8 +20,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - pprofile "github.com/google/pprof/profile" ) // outChannelSize specifies the size of the profile output channel. @@ -64,15 +62,14 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { - mu sync.Mutex - cfg *config // profile configuration - out chan batch // upload queue - uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests - exit chan struct{} // exit signals the profiler to stop; it is closed after stopping - stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. - wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. - met *metrics // metric collector state - prev map[string]*pprofile.Profile // previous collection results for delta profiling + cfg *config // profile configuration + out chan batch // upload queue + uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests + exit chan struct{} // exit signals the profiler to stop; it is closed after stopping + stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. + wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. + met *metrics // metric collector state + deltas map[string]*deltaProfiler telemetry *telemetry.Client seq uint64 // seq is the value of the profile_seq tag pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling @@ -182,11 +179,16 @@ func newProfiler(opts ...Option) (*profiler, error) { } p := profiler{ - cfg: cfg, - out: make(chan batch, outChannelSize), - exit: make(chan struct{}), - met: newMetrics(), - prev: make(map[string]*pprofile.Profile), + cfg: cfg, + out: make(chan batch, outChannelSize), + exit: make(chan struct{}), + met: newMetrics(), + deltas: make(map[string]*deltaProfiler), + } + for pt := range cfg.types { + if d := profileTypes[pt].Delta; d != nil { + p.deltas[pt.lookup().Name] = &deltaProfiler{delta: d} + } } p.uploadFunc = p.upload p.telemetry = &telemetry.Client{ From 57374a0f7af15ac72cea5f408ce05e587c325039 Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Wed, 28 Sep 2022 08:33:28 -0400 Subject: [PATCH 2/4] profiler: add constructor for deltaProfiler Push more of the implementation details into the deltaProfiler type --- profiler/profile.go | 9 +++++++++ profiler/profiler.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/profiler/profile.go b/profiler/profile.go index 703ec3f8b9..47c6c3adeb 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -302,6 +302,15 @@ type deltaProfiler struct { prev *pprofile.Profile } +// newDeltaProfiler returns an initialized deltaProfiler. If value types +// are given (e.g. "alloc_space", "alloc_objects"), only those values will have +// deltas computed. Otherwise, deltas will be computed for every value. +func newDeltaProfiler(v ...pprofutils.ValueType) *deltaProfiler { + return &deltaProfiler{ + delta: &pprofutils.Delta{SampleTypes: v}, + } +} + // Delta derives the delta profile between curData and the profile passed to the // previous call to Delta. The first call to Delta will return the profile // unchanged. diff --git a/profiler/profiler.go b/profiler/profiler.go index 6004cd67d2..8ee732ea72 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -187,7 +187,7 @@ func newProfiler(opts ...Option) (*profiler, error) { } for pt := range cfg.types { if d := profileTypes[pt].Delta; d != nil { - p.deltas[pt.lookup().Name] = &deltaProfiler{delta: d} + p.deltas[pt.lookup().Name] = newDeltaProfiler(d.SampleTypes...) } } p.uploadFunc = p.upload From a5ee2e54f0f1c0557177d239b31a7110b1557cdc Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Thu, 29 Sep 2022 09:33:53 -0400 Subject: [PATCH 3/4] profiler: make delta do nothing by default, respond to review comments Change the Delta field of profileType to DeltaValue, a list of values for which to compute deltas. A length-0 DeltaValue means that the profile type doesn't support delta profiles. This gets rid of the overloaded meaning of Delta == nil meaning "do nothing" and len(Delta.SampleTypes) == 0 mean "do everything". --- profiler/internal/pprofutils/delta.go | 10 +------- profiler/internal/pprofutils/delta_test.go | 4 +++- profiler/profile.go | 28 +++++++++++++--------- profiler/profile_test.go | 26 ++++++++++---------- profiler/profiler.go | 4 ++-- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/profiler/internal/pprofutils/delta.go b/profiler/internal/pprofutils/delta.go index 1326db36b7..f81fa169aa 100644 --- a/profiler/internal/pprofutils/delta.go +++ b/profiler/internal/pprofutils/delta.go @@ -17,8 +17,6 @@ type Delta struct { // SampleTypes limits the delta calcultion to the given sample types. Other // sample types will retain the values of profile b. The defined sample types // must exist in the profile, otherwise derivation will fail with an error. - // If the slice is empty, all sample types are subject to delta profile - // derivation. // // The use case for this for this is to deal with the heap profile which // contains alloc and inuse sample types, but delta profiling makes no sense @@ -35,13 +33,7 @@ func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { found := 0 for i, st := range a.SampleType { - // Empty c.SampleTypes means we calculate the delta for every st - if len(d.SampleTypes) == 0 { - ratios[i] = -1 - continue - } - - // Otherwise we only calcuate the delta for any st that is listed in + // We only calcuate the delta for any st that is listed in // c.SampleTypes. st's not listed in there will default to ratio 0, which // means we delete them from pa, so only the pb values remain in the final // profile. diff --git a/profiler/internal/pprofutils/delta_test.go b/profiler/internal/pprofutils/delta_test.go index c922aa2847..9fdc588718 100644 --- a/profiler/internal/pprofutils/delta_test.go +++ b/profiler/internal/pprofutils/delta_test.go @@ -18,6 +18,7 @@ func TestDelta(t *testing.T) { var deltaText bytes.Buffer profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count main;foo 5 main;foo;bar 3 main;foobar 4 @@ -25,13 +26,14 @@ main;foobar 4 require.NoError(t, err) profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count main;foo 8 main;foo;bar 3 main;foobar 5 `))) require.NoError(t, err) - delta, err := Delta{}.Convert(profA, profB) + delta, err := Delta{SampleTypes: []ValueType{{Type: "x", Unit: "count"}}}.Convert(profA, profB) require.NoError(t, err) require.NoError(t, Protobuf{}.Convert(delta, &deltaText)) diff --git a/profiler/profile.go b/profiler/profile.go index 47c6c3adeb..0da8b32139 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -67,10 +67,10 @@ type profileType struct { // Collect collects the given profile and returns the data for it. Most // profiles will be in pprof format, i.e. gzip compressed proto buf data. Collect func(p *profiler) ([]byte, error) - // Delta identifies which values in profile samples should be modified - // when delta profiling is enabled, if the profile type supports delta - // profiles - Delta *pprofutils.Delta + // DeltaValues identifies which values in profile samples should be modified + // when delta profiling is enabled. Empty DeltaValues means delta profiling is + // not supported for this profile type + DeltaValues []pprofutils.ValueType } // profileTypes maps every ProfileType to its implementation. @@ -111,22 +111,28 @@ var profileTypes = map[ProfileType]profileType{ Name: "heap", Filename: "heap.pprof", Collect: collectGenericProfile("heap", HeapProfile), - Delta: &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ + DeltaValues: []pprofutils.ValueType{ {Type: "alloc_objects", Unit: "count"}, {Type: "alloc_space", Unit: "bytes"}, - }}, + }, }, MutexProfile: { Name: "mutex", Filename: "mutex.pprof", Collect: collectGenericProfile("mutex", MutexProfile), - Delta: &pprofutils.Delta{}, + DeltaValues: []pprofutils.ValueType{ + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, }, BlockProfile: { Name: "block", Filename: "block.pprof", Collect: collectGenericProfile("block", BlockProfile), - Delta: &pprofutils.Delta{}, + DeltaValues: []pprofutils.ValueType{ + {Type: "contentions", Unit: "count"}, + {Type: "delay", Unit: "nanoseconds"}, + }, }, GoroutineProfile: { Name: "goroutine", @@ -290,7 +296,7 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { tags := append(p.cfg.tags.Slice(), pt.Tag()) filename := t.Filename // TODO(fg): Consider making Collect() return the filename. - if p.cfg.deltaProfiles && t.Delta != nil { + if p.cfg.deltaProfiles && len(t.DeltaValues) > 0 { filename = "delta-" + filename } p.cfg.statsd.Timing("datadog.profiling.go.collect_time", end.Sub(start), tags, 1) @@ -298,7 +304,7 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { } type deltaProfiler struct { - delta *pprofutils.Delta + delta pprofutils.Delta prev *pprofile.Profile } @@ -307,7 +313,7 @@ type deltaProfiler struct { // deltas computed. Otherwise, deltas will be computed for every value. func newDeltaProfiler(v ...pprofutils.ValueType) *deltaProfiler { return &deltaProfiler{ - delta: &pprofutils.Delta{SampleTypes: v}, + delta: pprofutils.Delta{SampleTypes: v}, } } diff --git a/profiler/profile_test.go b/profiler/profile_test.go index c9ca810a8f..ba25f82f9d 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -43,29 +43,29 @@ func TestRunProfile(t *testing.T) { Prof1: textProfile{ Time: timeA, Text: ` -stuff/count -main 3 -main;bar 2 -main;foo 5 +contentions/count delay/nanoseconds +main 3 1 +main;bar 2 1 +main;foo 5 1 `, }, Prof2: textProfile{ Time: timeB, Text: ` -stuff/count -main 4 -main;bar 2 -main;foo 8 -main;foobar 7 +contentions/count delay/nanoseconds +main 4 1 +main;bar 2 1 +main;foo 8 1 +main;foobar 7 1 `, }, WantDelta: textProfile{ Time: timeA, Text: ` -stuff/count -main;foobar 7 -main;foo 3 -main 1 +contentions/count delay/nanoseconds +main;foobar 7 1 +main;foo 3 0 +main 1 0 `, }, WantDuration: deltaPeriod, diff --git a/profiler/profiler.go b/profiler/profiler.go index 8ee732ea72..3404c0d5e4 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -186,8 +186,8 @@ func newProfiler(opts ...Option) (*profiler, error) { deltas: make(map[string]*deltaProfiler), } for pt := range cfg.types { - if d := profileTypes[pt].Delta; d != nil { - p.deltas[pt.lookup().Name] = newDeltaProfiler(d.SampleTypes...) + if d := profileTypes[pt].DeltaValues; len(d) > 0 { + p.deltas[pt.lookup().Name] = newDeltaProfiler(d...) } } p.uploadFunc = p.upload From d796776975ff9b75b51e863ab2b76837807ae166 Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Fri, 30 Sep 2022 09:37:59 -0400 Subject: [PATCH 4/4] profiler: make deltas keys ProfileType instead of string --- profiler/profile.go | 2 +- profiler/profiler.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/profile.go b/profiler/profile.go index 0da8b32139..424753e1c0 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -202,7 +202,7 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt var buf bytes.Buffer err := p.lookupProfile(name, &buf, 0) data := buf.Bytes() - dp, ok := p.deltas[name] + dp, ok := p.deltas[pt] if !ok || !p.cfg.deltaProfiles { return data, err } diff --git a/profiler/profiler.go b/profiler/profiler.go index 3404c0d5e4..bb0610216e 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -69,7 +69,7 @@ type profiler struct { stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. met *metrics // metric collector state - deltas map[string]*deltaProfiler + deltas map[ProfileType]*deltaProfiler telemetry *telemetry.Client seq uint64 // seq is the value of the profile_seq tag pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling @@ -183,11 +183,11 @@ func newProfiler(opts ...Option) (*profiler, error) { out: make(chan batch, outChannelSize), exit: make(chan struct{}), met: newMetrics(), - deltas: make(map[string]*deltaProfiler), + deltas: make(map[ProfileType]*deltaProfiler), } for pt := range cfg.types { if d := profileTypes[pt].DeltaValues; len(d) > 0 { - p.deltas[pt.lookup().Name] = newDeltaProfiler(d...) + p.deltas[pt] = newDeltaProfiler(d...) } } p.uploadFunc = p.upload