Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: ensure that CPU profile records profiler work #1485

Merged
merged 6 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions profiler/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ type profileType struct {
// profileTypes maps every ProfileType to its implementation.
var profileTypes = map[ProfileType]profileType{
CPUProfile: {
Name: "cpu",
Filename: "cpu.pprof",
Collect: func(p *profiler) ([]byte, error) {
var buf bytes.Buffer
// Start the CPU profiler at the end of the profiling
// period so that we're sure to capture the CPU usage of
// this library, which mostly happens at the end
p.interruptibleSleep(p.cfg.period - p.cfg.cpuDuration)
if p.cfg.cpuProfileRate != 0 {
// The profile has to be set each time before
// profiling is started. Otherwise,
Expand All @@ -90,6 +93,10 @@ var profileTypes = map[ProfileType]profileType{
return nil, err
}
p.interruptibleSleep(p.cfg.cpuDuration)
// We want the CPU profiler to finish last so that it can
// properly record all of our profile processing work for
// the other profile types
p.pendingProfiles.Wait()
p.stopCPUProfile()
return buf.Bytes(), nil
},
Expand All @@ -102,7 +109,7 @@ var profileTypes = map[ProfileType]profileType{
HeapProfile: {
Name: "heap",
Filename: "heap.pprof",
Collect: collectGenericProfile("heap", &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
Collect: collectGenericProfile("heap", HeapProfile, &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
{Type: "alloc_objects", Unit: "count"},
{Type: "alloc_space", Unit: "bytes"},
}}),
Expand All @@ -111,19 +118,19 @@ var profileTypes = map[ProfileType]profileType{
MutexProfile: {
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", &pprofutils.Delta{}),
Collect: collectGenericProfile("mutex", MutexProfile, &pprofutils.Delta{}),
SupportsDelta: true,
},
BlockProfile: {
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", &pprofutils.Delta{}),
Collect: collectGenericProfile("block", BlockProfile, &pprofutils.Delta{}),
SupportsDelta: true,
},
GoroutineProfile: {
Name: "goroutine",
Filename: "goroutines.pprof",
Collect: collectGenericProfile("goroutine", nil),
Collect: collectGenericProfile("goroutine", GoroutineProfile, nil),
},
expGoroutineWaitProfile: {
Name: "goroutinewait",
Expand Down Expand Up @@ -159,13 +166,12 @@ var profileTypes = map[ProfileType]profileType{
},
}

func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) {
func collectGenericProfile(name string, pt ProfileType, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I tried just making collectGenericProfile just take a ProfileType argument and drop the less type-safe name argument. However, this creates a initialization cycle between the profileTypes map and the collectGenericProfile function, since we still want to get the profile name for doing the lookup, and getting the name requires accessing profileTypes.

Not sure how much refactoring to do in this PR. I could give ProfileType a name() string method that just does a switch statement, and remove Name from profileType? Or leave it kind of awkward for now and refactor in a subsequent PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it awkward for now, it's ok. We'll come back to it.

return func(p *profiler) ([]byte, error) {
var extra []*pprofile.Profile
// TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name.
cAlloc, ok := extensions.GetCAllocationProfiler()
switch {
case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && name == "heap":
case ok && p.cfg.cmemprofEnabled && p.cfg.deltaProfiles && pt == HeapProfile:
// For the heap profile, we'd also like to include C
// allocations if that extension is enabled and have the
// allocations show up in the same profile. Collect them
Expand Down
2 changes: 1 addition & 1 deletion profiler/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ main;bar 0 0 8 16
})

t.Run("cpu", func(t *testing.T) {
p, err := unstartedProfiler(CPUDuration(10 * time.Millisecond))
p, err := unstartedProfiler(CPUDuration(10*time.Millisecond), WithPeriod(10*time.Millisecond))
nsrip-dd marked this conversation as resolved.
Show resolved Hide resolved
p.testHooks.startCPUProfile = func(w io.Writer) error {
_, err := w.Write([]byte("my-cpu-profile"))
return err
Expand Down
37 changes: 26 additions & 11 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,18 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[string]*pprofile.Profile // previous collection results for delta profiling
telemetry *telemetry.Client
seq uint64 // seq is the value of the profile_seq tag
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[string]*pprofile.Profile // previous collection results for delta profiling
telemetry *telemetry.Client
seq uint64 // seq is the value of the profile_seq tag
pendingProfiles sync.WaitGroup // signal that profile collection is done, for stopping CPU profiling

testHooks testHooks
}
Expand Down Expand Up @@ -303,10 +304,24 @@ func (p *profiler) collect(ticker <-chan time.Time) {
p.seq++

completed = completed[:0]
// We need to increment pendingProfiles for every non-CPU
// profile _before_ entering the next loop so that we know CPU
// profiling will not complete until every other profile is
// finished (because p.pendingProfiles will have been
// incremented to count every non-CPU profile before CPU
// profiling starts)
for _, t := range p.enabledProfileTypes() {
if t != CPUProfile {
p.pendingProfiles.Add(1)
}
}
felixge marked this conversation as resolved.
Show resolved Hide resolved
felixge marked this conversation as resolved.
Show resolved Hide resolved
for _, t := range p.enabledProfileTypes() {
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
if t != CPUProfile {
defer p.pendingProfiles.Done()
}
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
Expand Down