Skip to content

Commit

Permalink
profiler: wrap delta profiling in a type (#1483)
Browse files Browse the repository at this point in the history
Wrap up delta profiling in a type with a Delta method. This type holds
any information about the previous profile needed to compute the delta
with the newest profile. The profiler keeps an instance of type for each
profile type which supports delta profiling. This better encapsulates
the implementation details of delta profiling, and facilitates upcoming
implementation changes.

As part of this change, pull the logic for merging in "extra" profiles
out of the delta profiling code path. This logic was implemented for C
allocation profiling, and the extra data was passed through to delta
profiling mainly for efficiency. However, merging in extra data is
orthogonal to delta profiling and we can find other ways to make that
more efficient if we need to.

Change the Delta field of profileType to DeltaValue, a list of values
for which to compute deltas. A length-0 DeltaValue means that the
profile type doesn't support delta profiles. This gets rid of the
overloaded meaning of Delta == nil meaning "do nothing" and
len(Delta.SampleTypes) == 0 mean "do everything".
  • Loading branch information
nsrip-dd committed Sep 30, 2022
1 parent 0775735 commit 77945bc
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 82 deletions.
20 changes: 3 additions & 17 deletions profiler/internal/pprofutils/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type Delta struct {
// SampleTypes limits the delta calcultion to the given sample types. Other
// sample types will retain the values of profile b. The defined sample types
// must exist in the profile, otherwise derivation will fail with an error.
// If the slice is empty, all sample types are subject to delta profile
// derivation.
//
// The use case for this for this is to deal with the heap profile which
// contains alloc and inuse sample types, but delta profiling makes no sense
Expand All @@ -30,21 +28,12 @@ type Delta struct {
// profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile
// a will be mutated by this function. You should pass a copy if that's
// undesirable.
//
// Other profiles that should be merged into the resulting profile can be passed
// through the extra parameter.
func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) {
func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) {
ratios := make([]float64, len(a.SampleType))

found := 0
for i, st := range a.SampleType {
// Empty c.SampleTypes means we calculate the delta for every st
if len(d.SampleTypes) == 0 {
ratios[i] = -1
continue
}

// Otherwise we only calcuate the delta for any st that is listed in
// We only calcuate the delta for any st that is listed in
// c.SampleTypes. st's not listed in there will default to ratio 0, which
// means we delete them from pa, so only the pb values remain in the final
// profile.
Expand All @@ -61,10 +50,7 @@ func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profi

a.ScaleN(ratios)

profiles := make([]*profile.Profile, 0, 2+len(extra))
profiles = append(profiles, a, b)
profiles = append(profiles, extra...)
delta, err := profile.Merge(profiles)
delta, err := profile.Merge([]*profile.Profile{a, b})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion profiler/internal/pprofutils/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@ func TestDelta(t *testing.T) {
var deltaText bytes.Buffer

profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(`
x/count
main;foo 5
main;foo;bar 3
main;foobar 4
`)))
require.NoError(t, err)

profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(`
x/count
main;foo 8
main;foo;bar 3
main;foobar 5
`)))
require.NoError(t, err)

delta, err := Delta{}.Convert(profA, profB)
delta, err := Delta{SampleTypes: []ValueType{{Type: "x", Unit: "count"}}}.Convert(profA, profB)
require.NoError(t, err)

require.NoError(t, Protobuf{}.Convert(delta, &deltaText))
Expand Down
101 changes: 67 additions & 34 deletions profiler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ type profileType struct {
// this isn't done due to idiosyncratic filename used by the
// GoroutineProfile.
Filename string
// SupportsDelta indicates whether delta profiles can be computed for
// this profile type, which is used to determine the final filename
SupportsDelta bool
// Collect collects the given profile and returns the data for it. Most
// profiles will be in pprof format, i.e. gzip compressed proto buf data.
Collect func(p *profiler) ([]byte, error)
// DeltaValues identifies which values in profile samples should be modified
// when delta profiling is enabled. Empty DeltaValues means delta profiling is
// not supported for this profile type
DeltaValues []pprofutils.ValueType
}

// profileTypes maps every ProfileType to its implementation.
Expand Down Expand Up @@ -109,28 +110,34 @@ var profileTypes = map[ProfileType]profileType{
HeapProfile: {
Name: "heap",
Filename: "heap.pprof",
Collect: collectGenericProfile("heap", HeapProfile, &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
Collect: collectGenericProfile("heap", HeapProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "alloc_objects", Unit: "count"},
{Type: "alloc_space", Unit: "bytes"},
}}),
SupportsDelta: true,
},
},
MutexProfile: {
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", MutexProfile, &pprofutils.Delta{}),
SupportsDelta: true,
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", MutexProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "contentions", Unit: "count"},
{Type: "delay", Unit: "nanoseconds"},
},
},
BlockProfile: {
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", BlockProfile, &pprofutils.Delta{}),
SupportsDelta: true,
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", BlockProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "contentions", Unit: "count"},
{Type: "delay", Unit: "nanoseconds"},
},
},
GoroutineProfile: {
Name: "goroutine",
Filename: "goroutines.pprof",
Collect: collectGenericProfile("goroutine", GoroutineProfile, nil),
Collect: collectGenericProfile("goroutine", GoroutineProfile),
},
expGoroutineWaitProfile: {
Name: "goroutinewait",
Expand Down Expand Up @@ -166,9 +173,9 @@ var profileTypes = map[ProfileType]profileType{
},
}

func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) {
func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byte, error) {
return func(p *profiler) ([]byte, error) {
var extra []*pprofile.Profile
var extra *pprofile.Profile
cAlloc, ok := extensions.GetCAllocationProfiler()
switch {
case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && pt == HeapProfile:
Expand All @@ -183,7 +190,7 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta)
p.interruptibleSleep(p.cfg.period)
profile, err := cAlloc.Stop()
if err == nil {
extra = append(extra, profile)
extra = profile
}
default:
// In all cases, sleep until the end of the profile
Expand All @@ -195,18 +202,34 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta)
var buf bytes.Buffer
err := p.lookupProfile(name, &buf, 0)
data := buf.Bytes()
if delta == nil || !p.cfg.deltaProfiles {
dp, ok := p.deltas[pt]
if !ok || !p.cfg.deltaProfiles {
return data, err
}

start := time.Now()
delta, err := p.deltaProfile(name, delta, data, extra...)
delta, err := dp.Delta(data)
if err == nil && extra != nil {
extended, err := pprofile.ParseData(delta)
if err != nil {
return nil, err
}
extended, err = pprofile.Merge([]*pprofile.Profile{extended, extra})
if err != nil {
return nil, err
}
buf.Reset()
if err := extended.Write(&buf); err != nil {
return nil, err
}
delta = buf.Bytes()
}
tags := append(p.cfg.tags.Slice(), fmt.Sprintf("profile_type:%s", name))
p.cfg.statsd.Timing("datadog.profiling.go.delta_time", time.Since(start), tags, 1)
if err != nil {
return nil, fmt.Errorf("delta profile error: %s", err)
}
return delta.data, err
return delta, err
}
}

Expand Down Expand Up @@ -273,25 +296,37 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) {
tags := append(p.cfg.tags.Slice(), pt.Tag())
filename := t.Filename
// TODO(fg): Consider making Collect() return the filename.
if p.cfg.deltaProfiles && t.SupportsDelta {
if p.cfg.deltaProfiles && len(t.DeltaValues) > 0 {
filename = "delta-" + filename
}
p.cfg.statsd.Timing("datadog.profiling.go.collect_time", end.Sub(start), tags, 1)
return []*profile{{name: filename, data: data}}, nil
}

// deltaProfile derives the delta profile between curData and the previous
// profile. If extra profiles are provided, they will be merged into the final
// profile after computing the delta profile.
func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) {
type deltaProfiler struct {
delta pprofutils.Delta
prev *pprofile.Profile
}

// newDeltaProfiler returns an initialized deltaProfiler. If value types
// are given (e.g. "alloc_space", "alloc_objects"), only those values will have
// deltas computed. Otherwise, deltas will be computed for every value.
func newDeltaProfiler(v ...pprofutils.ValueType) *deltaProfiler {
return &deltaProfiler{
delta: pprofutils.Delta{SampleTypes: v},
}
}

// Delta derives the delta profile between curData and the profile passed to the
// previous call to Delta. The first call to Delta will return the profile
// unchanged.
func (d *deltaProfiler) Delta(curData []byte) ([]byte, error) {
curProf, err := pprofile.ParseData(curData)
if err != nil {
return nil, fmt.Errorf("delta prof parse: %v", err)
}
var deltaData []byte
p.mu.Lock()
prevProf := p.prev[name]
p.mu.Unlock()
prevProf := d.prev
if prevProf == nil {
// First time deltaProfile gets called for a type, there is no prevProf. In
// this case we emit the current profile as a delta profile.
Expand All @@ -301,7 +336,7 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
// Unfortunately the core implementation isn't resuable via a API, so we do
// our own delta calculation below.
// https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304
deltaProf, err := delta.Convert(prevProf, curProf, extra...)
deltaProf, err := d.delta.Convert(prevProf, curProf)
if err != nil {
return nil, fmt.Errorf("delta prof merge: %v", err)
}
Expand All @@ -318,10 +353,8 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
}
// Keep the most recent profiles in memory for future diffing. This needs to
// be taken into account when enforcing memory limits going forward.
p.mu.Lock()
p.prev[name] = curProf
p.mu.Unlock()
return &profile{data: deltaData}, nil
d.prev = curProf
return deltaData, nil
}

func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) {
Expand Down
28 changes: 14 additions & 14 deletions profiler/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,29 @@ func TestRunProfile(t *testing.T) {
Prof1: textProfile{
Time: timeA,
Text: `
stuff/count
main 3
main;bar 2
main;foo 5
contentions/count delay/nanoseconds
main 3 1
main;bar 2 1
main;foo 5 1
`,
},
Prof2: textProfile{
Time: timeB,
Text: `
stuff/count
main 4
main;bar 2
main;foo 8
main;foobar 7
contentions/count delay/nanoseconds
main 4 1
main;bar 2 1
main;foo 8 1
main;foobar 7 1
`,
},
WantDelta: textProfile{
Time: timeA,
Text: `
stuff/count
main;foobar 7
main;foo 3
main 1
contentions/count delay/nanoseconds
main;foobar 7 1
main;foo 3 0
main 1 0
`,
},
WantDuration: deltaPeriod,
Expand Down Expand Up @@ -115,7 +115,7 @@ main;bar 0 0 8 16
// followed by prof2 when calling runProfile().
deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) {
returnProfs := [][]byte{prof1, prof2}
opts = append(opts, WithPeriod(5*time.Millisecond))
opts = append(opts, WithPeriod(5*time.Millisecond), WithProfileTypes(HeapProfile, MutexProfile, BlockProfile))
p, err := unstartedProfiler(opts...)
p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error {
_, err := w.Write(returnProfs[0])
Expand Down
34 changes: 18 additions & 16 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

pprofile "github.com/google/pprof/profile"
)

// outChannelSize specifies the size of the profile output channel.
Expand Down Expand Up @@ -64,15 +62,14 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[string]*pprofile.Profile // previous collection results for delta profiling
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
deltas map[ProfileType]*deltaProfiler
telemetry *telemetry.Client
seq uint64 // seq is the value of the profile_seq tag
pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling
Expand Down Expand Up @@ -182,11 +179,16 @@ func newProfiler(opts ...Option) (*profiler, error) {
}

p := profiler{
cfg: cfg,
out: make(chan batch, outChannelSize),
exit: make(chan struct{}),
met: newMetrics(),
prev: make(map[string]*pprofile.Profile),
cfg: cfg,
out: make(chan batch, outChannelSize),
exit: make(chan struct{}),
met: newMetrics(),
deltas: make(map[ProfileType]*deltaProfiler),
}
for pt := range cfg.types {
if d := profileTypes[pt].DeltaValues; len(d) > 0 {
p.deltas[pt] = newDeltaProfiler(d...)
}
}
p.uploadFunc = p.upload
p.telemetry = &telemetry.Client{
Expand Down

0 comments on commit 77945bc

Please sign in to comment.