Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: wrap delta profiling in a type #1483

Merged
merged 4 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions profiler/internal/pprofutils/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type Delta struct {
// SampleTypes limits the delta calcultion to the given sample types. Other
// sample types will retain the values of profile b. The defined sample types
// must exist in the profile, otherwise derivation will fail with an error.
// If the slice is empty, all sample types are subject to delta profile
// derivation.
//
// The use case for this for this is to deal with the heap profile which
// contains alloc and inuse sample types, but delta profiling makes no sense
Expand All @@ -30,21 +28,12 @@ type Delta struct {
// profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile
// a will be mutated by this function. You should pass a copy if that's
// undesirable.
//
// Other profiles that should be merged into the resulting profile can be passed
// through the extra parameter.
func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) {
func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) {
ratios := make([]float64, len(a.SampleType))

found := 0
for i, st := range a.SampleType {
// Empty c.SampleTypes means we calculate the delta for every st
if len(d.SampleTypes) == 0 {
ratios[i] = -1
continue
}

// Otherwise we only calcuate the delta for any st that is listed in
// We only calcuate the delta for any st that is listed in
// c.SampleTypes. st's not listed in there will default to ratio 0, which
// means we delete them from pa, so only the pb values remain in the final
// profile.
Expand All @@ -61,10 +50,7 @@ func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profi

a.ScaleN(ratios)

profiles := make([]*profile.Profile, 0, 2+len(extra))
profiles = append(profiles, a, b)
profiles = append(profiles, extra...)
delta, err := profile.Merge(profiles)
delta, err := profile.Merge([]*profile.Profile{a, b})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion profiler/internal/pprofutils/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@ func TestDelta(t *testing.T) {
var deltaText bytes.Buffer

profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(`
x/count
main;foo 5
main;foo;bar 3
main;foobar 4
`)))
require.NoError(t, err)

profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(`
x/count
main;foo 8
main;foo;bar 3
main;foobar 5
`)))
require.NoError(t, err)

delta, err := Delta{}.Convert(profA, profB)
delta, err := Delta{SampleTypes: []ValueType{{Type: "x", Unit: "count"}}}.Convert(profA, profB)
require.NoError(t, err)

require.NoError(t, Protobuf{}.Convert(delta, &deltaText))
Expand Down
101 changes: 67 additions & 34 deletions profiler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ type profileType struct {
// this isn't done due to idiosyncratic filename used by the
// GoroutineProfile.
Filename string
// SupportsDelta indicates whether delta profiles can be computed for
// this profile type, which is used to determine the final filename
SupportsDelta bool
// Collect collects the given profile and returns the data for it. Most
// profiles will be in pprof format, i.e. gzip compressed proto buf data.
Collect func(p *profiler) ([]byte, error)
// DeltaValues identifies which values in profile samples should be modified
// when delta profiling is enabled. Empty DeltaValues means delta profiling is
// not supported for this profile type
DeltaValues []pprofutils.ValueType
}

// profileTypes maps every ProfileType to its implementation.
Expand Down Expand Up @@ -109,28 +110,34 @@ var profileTypes = map[ProfileType]profileType{
HeapProfile: {
Name: "heap",
Filename: "heap.pprof",
Collect: collectGenericProfile("heap", HeapProfile, &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
Collect: collectGenericProfile("heap", HeapProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "alloc_objects", Unit: "count"},
{Type: "alloc_space", Unit: "bytes"},
}}),
SupportsDelta: true,
},
},
MutexProfile: {
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", MutexProfile, &pprofutils.Delta{}),
SupportsDelta: true,
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", MutexProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "contentions", Unit: "count"},
{Type: "delay", Unit: "nanoseconds"},
},
},
BlockProfile: {
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", BlockProfile, &pprofutils.Delta{}),
SupportsDelta: true,
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", BlockProfile),
DeltaValues: []pprofutils.ValueType{
{Type: "contentions", Unit: "count"},
{Type: "delay", Unit: "nanoseconds"},
},
},
GoroutineProfile: {
Name: "goroutine",
Filename: "goroutines.pprof",
Collect: collectGenericProfile("goroutine", GoroutineProfile, nil),
Collect: collectGenericProfile("goroutine", GoroutineProfile),
},
expGoroutineWaitProfile: {
Name: "goroutinewait",
Expand Down Expand Up @@ -166,9 +173,9 @@ var profileTypes = map[ProfileType]profileType{
},
}

func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) {
func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byte, error) {
return func(p *profiler) ([]byte, error) {
var extra []*pprofile.Profile
var extra *pprofile.Profile
cAlloc, ok := extensions.GetCAllocationProfiler()
switch {
case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && pt == HeapProfile:
Expand All @@ -183,7 +190,7 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta)
p.interruptibleSleep(p.cfg.period)
profile, err := cAlloc.Stop()
if err == nil {
extra = append(extra, profile)
extra = profile
}
default:
// In all cases, sleep until the end of the profile
Expand All @@ -195,18 +202,34 @@ func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta)
var buf bytes.Buffer
err := p.lookupProfile(name, &buf, 0)
data := buf.Bytes()
if delta == nil || !p.cfg.deltaProfiles {
dp, ok := p.deltas[pt]
if !ok || !p.cfg.deltaProfiles {
return data, err
}

start := time.Now()
delta, err := p.deltaProfile(name, delta, data, extra...)
delta, err := dp.Delta(data)
if err == nil && extra != nil {
extended, err := pprofile.ParseData(delta)
if err != nil {
return nil, err
}
extended, err = pprofile.Merge([]*pprofile.Profile{extended, extra})
if err != nil {
return nil, err
}
buf.Reset()
if err := extended.Write(&buf); err != nil {
return nil, err
}
delta = buf.Bytes()
}
tags := append(p.cfg.tags.Slice(), fmt.Sprintf("profile_type:%s", name))
p.cfg.statsd.Timing("datadog.profiling.go.delta_time", time.Since(start), tags, 1)
if err != nil {
return nil, fmt.Errorf("delta profile error: %s", err)
}
return delta.data, err
return delta, err
}
}

Expand Down Expand Up @@ -273,25 +296,37 @@ func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) {
tags := append(p.cfg.tags.Slice(), pt.Tag())
filename := t.Filename
// TODO(fg): Consider making Collect() return the filename.
if p.cfg.deltaProfiles && t.SupportsDelta {
if p.cfg.deltaProfiles && len(t.DeltaValues) > 0 {
filename = "delta-" + filename
}
p.cfg.statsd.Timing("datadog.profiling.go.collect_time", end.Sub(start), tags, 1)
return []*profile{{name: filename, data: data}}, nil
}

// deltaProfile derives the delta profile between curData and the previous
// profile. If extra profiles are provided, they will be merged into the final
// profile after computing the delta profile.
func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) {
type deltaProfiler struct {
delta pprofutils.Delta
prev *pprofile.Profile
}

// newDeltaProfiler returns an initialized deltaProfiler. If value types
// are given (e.g. "alloc_space", "alloc_objects"), only those values will have
// deltas computed. Otherwise, deltas will be computed for every value.
func newDeltaProfiler(v ...pprofutils.ValueType) *deltaProfiler {
return &deltaProfiler{
delta: pprofutils.Delta{SampleTypes: v},
}
}

// Delta derives the delta profile between curData and the profile passed to the
// previous call to Delta. The first call to Delta will return the profile
// unchanged.
func (d *deltaProfiler) Delta(curData []byte) ([]byte, error) {
curProf, err := pprofile.ParseData(curData)
if err != nil {
return nil, fmt.Errorf("delta prof parse: %v", err)
}
var deltaData []byte
p.mu.Lock()
prevProf := p.prev[name]
p.mu.Unlock()
prevProf := d.prev
if prevProf == nil {
// First time deltaProfile gets called for a type, there is no prevProf. In
// this case we emit the current profile as a delta profile.
Expand All @@ -301,7 +336,7 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
// Unfortunately the core implementation isn't resuable via a API, so we do
// our own delta calculation below.
// https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304
deltaProf, err := delta.Convert(prevProf, curProf, extra...)
deltaProf, err := d.delta.Convert(prevProf, curProf)
if err != nil {
return nil, fmt.Errorf("delta prof merge: %v", err)
}
Expand All @@ -318,10 +353,8 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
}
// Keep the most recent profiles in memory for future diffing. This needs to
// be taken into account when enforcing memory limits going forward.
p.mu.Lock()
p.prev[name] = curProf
p.mu.Unlock()
return &profile{data: deltaData}, nil
d.prev = curProf
return deltaData, nil
}

func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) {
Expand Down
28 changes: 14 additions & 14 deletions profiler/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,29 @@ func TestRunProfile(t *testing.T) {
Prof1: textProfile{
Time: timeA,
Text: `
stuff/count
main 3
main;bar 2
main;foo 5
contentions/count delay/nanoseconds
main 3 1
main;bar 2 1
main;foo 5 1
`,
},
Prof2: textProfile{
Time: timeB,
Text: `
stuff/count
main 4
main;bar 2
main;foo 8
main;foobar 7
contentions/count delay/nanoseconds
main 4 1
main;bar 2 1
main;foo 8 1
main;foobar 7 1
`,
},
WantDelta: textProfile{
Time: timeA,
Text: `
stuff/count
main;foobar 7
main;foo 3
main 1
contentions/count delay/nanoseconds
main;foobar 7 1
main;foo 3 0
main 1 0
`,
},
WantDuration: deltaPeriod,
Expand Down Expand Up @@ -115,7 +115,7 @@ main;bar 0 0 8 16
// followed by prof2 when calling runProfile().
deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) {
returnProfs := [][]byte{prof1, prof2}
opts = append(opts, WithPeriod(5*time.Millisecond))
opts = append(opts, WithPeriod(5*time.Millisecond), WithProfileTypes(HeapProfile, MutexProfile, BlockProfile))
nsrip-dd marked this conversation as resolved.
Show resolved Hide resolved
p, err := unstartedProfiler(opts...)
p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error {
_, err := w.Write(returnProfs[0])
Expand Down
34 changes: 18 additions & 16 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

pprofile "github.com/google/pprof/profile"
)

// outChannelSize specifies the size of the profile output channel.
Expand Down Expand Up @@ -64,15 +62,14 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[string]*pprofile.Profile // previous collection results for delta profiling
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
deltas map[ProfileType]*deltaProfiler
telemetry *telemetry.Client
seq uint64 // seq is the value of the profile_seq tag
pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling
Expand Down Expand Up @@ -182,11 +179,16 @@ func newProfiler(opts ...Option) (*profiler, error) {
}

p := profiler{
cfg: cfg,
out: make(chan batch, outChannelSize),
exit: make(chan struct{}),
met: newMetrics(),
prev: make(map[string]*pprofile.Profile),
cfg: cfg,
out: make(chan batch, outChannelSize),
exit: make(chan struct{}),
met: newMetrics(),
deltas: make(map[ProfileType]*deltaProfiler),
}
for pt := range cfg.types {
if d := profileTypes[pt].DeltaValues; len(d) > 0 {
p.deltas[pt] = newDeltaProfiler(d...)
}
}
p.uploadFunc = p.upload
p.telemetry = &telemetry.Client{
Expand Down