Skip to content

Commit

Permalink
Add pprof to runc-shim
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Wang <henwang@amazon.com>
  • Loading branch information
henry118 committed Jun 13, 2024
1 parent e49d3fd commit 503225f
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 105 deletions.
104 changes: 74 additions & 30 deletions cmd/containerd-shim-runc-v2/manager/manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -126,6 +127,59 @@ func (m manager) Name() string {
return m.name
}

type shimSocket struct {
addr string
s *net.UnixListener
f *os.File
}

func (s *shimSocket) Close() {
if s.s != nil {
s.s.Close()
}
if s.f != nil {
s.f.Close()
}
_ = shim.RemoveSocket(s.addr)
}

func newShimSocket(ctx context.Context, path, id string, debug bool) (*shimSocket, error) {
address, err := shim.SocketAddress(ctx, path, id, debug)
if err != nil {
return nil, err
}
socket, err := shim.NewSocket(address)
if err != nil {
// the only time where this would happen is if there is a bug and the socket
// was not cleaned up in the cleanup method of the shim or we are using the
// grouping functionality where the new process should be run with the same
// shim as an existing container
if !shim.SocketEaddrinuse(err) {
return nil, fmt.Errorf("create new shim socket: %w", err)
}
if !debug && shim.CanConnect(address) {
return &shimSocket{addr: address}, errdefs.ErrAlreadyExists
}
if err := shim.RemoveSocket(address); err != nil {
return nil, fmt.Errorf("remove pre-existing socket: %w", err)
}
if socket, err = shim.NewSocket(address); err != nil {
return nil, fmt.Errorf("try create new shim socket 2x: %w", err)
}
}
s := &shimSocket{
addr: address,
s: socket,
}
f, err := socket.File()
if err != nil {
s.Close()
return nil, err
}
s.f = f
return s, nil
}

func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shim.BootstrapParams, retErr error) {
var params shim.BootstrapParams
params.Version = 3
Expand All @@ -146,44 +200,35 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shi
break
}
}
address, err := shim.SocketAddress(ctx, opts.Address, grouping)
if err != nil {
return params, err
}

socket, err := shim.NewSocket(address)
if err != nil {
// the only time where this would happen is if there is a bug and the socket
// was not cleaned up in the cleanup method of the shim or we are using the
// grouping functionality where the new process should be run with the same
// shim as an existing container
if !shim.SocketEaddrinuse(err) {
return params, fmt.Errorf("create new shim socket: %w", err)
}
if shim.CanConnect(address) {
params.Address = address
return params, nil
}
if err := shim.RemoveSocket(address); err != nil {
return params, fmt.Errorf("remove pre-existing socket: %w", err)
}
if socket, err = shim.NewSocket(address); err != nil {
return params, fmt.Errorf("try create new shim socket 2x: %w", err)
}
}
var sockets []*shimSocket
defer func() {
if retErr != nil {
socket.Close()
_ = shim.RemoveSocket(address)
for _, s := range sockets {
s.Close()
}
}
}()

f, err := socket.File()
s, err := newShimSocket(ctx, opts.Address, grouping, false)
if err != nil {
if errdefs.IsAlreadyExists(err) {
params.Address = s.addr
return params, nil
}
return params, err
}
sockets = append(sockets, s)
cmd.ExtraFiles = append(cmd.ExtraFiles, s.f)

cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if opts.Debug {
s, err = newShimSocket(ctx, opts.Address, grouping, true)
if err != nil {
return params, err
}
sockets = append(sockets, s)
cmd.ExtraFiles = append(cmd.ExtraFiles, s.f)
}

goruntime.LockOSThread()
if os.Getenv("SCHED_CORE") != "" {
Expand All @@ -193,7 +238,6 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shi
}

if err := cmd.Start(); err != nil {
f.Close()
return params, err
}

Expand Down Expand Up @@ -233,7 +277,7 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shi
return params, fmt.Errorf("failed to adjust OOM score for shim: %w", err)
}

params.Address = address
params.Address = sockets[0].addr
return params, nil
}

Expand Down
175 changes: 110 additions & 65 deletions cmd/ctr/commands/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,7 @@ var pprofGoroutinesCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/goroutine?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return GoroutineProfile(context, getPProfClient)
},
}

Expand All @@ -89,16 +80,7 @@ var pprofHeapCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/heap?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return HeapProfile(context, getPProfClient)
},
}

Expand All @@ -119,17 +101,7 @@ var pprofProfileCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

seconds := context.Duration("seconds").Seconds()
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/profile?seconds=%v&debug=%d", seconds, debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return CPUProfile(context, getPProfClient)
},
}

Expand All @@ -150,18 +122,7 @@ var pprofTraceCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

seconds := context.Duration("seconds").Seconds()
debug := context.Uint("debug")
uri := fmt.Sprintf("/debug/pprof/trace?seconds=%v&debug=%d", seconds, debug)
output, err := httpGetRequest(client, uri)
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return TraceProfile(context, getPProfClient)
},
}

Expand All @@ -176,16 +137,7 @@ var pprofBlockCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/block?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return BlockProfile(context, getPProfClient)
},
}

Expand All @@ -200,27 +152,120 @@ var pprofThreadcreateCommand = &cli.Command{
},
},
Action: func(context *cli.Context) error {
client := getPProfClient(context)

debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/threadcreate?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
return ThreadcreateProfile(context, getPProfClient)
},
}

func getPProfClient(context *cli.Context) *http.Client {
// Client is a func that returns a http client for a pprof server
type Client func(context *cli.Context) (*http.Client, error)

// GoroutineProfile dumps goroutine stack dump
func GoroutineProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/goroutine?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

// HeapProfile dumps the heap profile
func HeapProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/heap?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

// CPUProfile dumps CPU profile
func CPUProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
seconds := context.Duration("seconds").Seconds()
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/profile?seconds=%v&debug=%d", seconds, debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

// TraceProfile collects execution trace
func TraceProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
seconds := context.Duration("seconds").Seconds()
debug := context.Uint("debug")
uri := fmt.Sprintf("/debug/pprof/trace?seconds=%v&debug=%d", seconds, debug)
output, err := httpGetRequest(client, uri)
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

// BlockProfile collects goroutine blocking profile
func BlockProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/block?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

// ThreadcreateProfile collects goroutine thread creating profile
func ThreadcreateProfile(context *cli.Context, clientFunc Client) error {
client, err := clientFunc(context)
if err != nil {
return err
}
debug := context.Uint("debug")
output, err := httpGetRequest(client, fmt.Sprintf("/debug/pprof/threadcreate?debug=%d", debug))
if err != nil {
return err
}
defer output.Close()
_, err = io.Copy(os.Stdout, output)
return err
}

func getPProfClient(context *cli.Context) (*http.Client, error) {
dialer := getPProfDialer(context.String("debug-socket"))

tr := &http.Transport{
Dial: dialer.pprofDial,
}
client := &http.Client{Transport: tr}
return client
return client, nil
}

func httpGetRequest(client *http.Client, request string) (io.ReadCloser, error) {
Expand Down
Loading

0 comments on commit 503225f

Please sign in to comment.