Skip to content

Commit

Permalink
bigslice: extract dump package from auto-log
Browse files Browse the repository at this point in the history
Summary:
Extract a `diagnostic/dump` package from bigslice auto-log. This merges work that we paused in D33024 with the recently added auto-log, hopefully leaving us with the best of both worlds:

  - Create a new `diagnostic/dump` package that handles the actual dumping of diagnostics.  It has these changes from auto-log:
    - Expose API for parts of the dump to be registered externally (i.e. anything importing the package can add to the diagnostic dump).
    - Stop relying on HTTP. Given that we want as much reliability as possible when dumping diagnostics, bypassing HTTP as a layer probably gives us a better shot at it.
    - Rename some of the files in the dump to be more consistent with route names.
    - Accommodate use cases that the HTTP endpoints don't handle yet (which we'll fix separately), specifically that we can have multiple `bigmachine.B`s and `bigslice.Session`s in a single process.
    - Make the dump a gzipped tar archive instead of a zip file (just because that's what was in D33024).
    - Make the dump filename more informative: include executable name, start time, and duration from start time.
    - Add a few more diagnostic files: load, CPU, memory, and command-line.
  - Make auto-log use `diagnostic/dump`.
  - Expose an endpoint, `/debug/dump`, that allows you to retrieve a diagnostic dump on demand.

Test Plan:
Add and run unit tests.
Run manual tests.

Reviewers: pgopal, marius, joshnewman

Reviewed By: marius, joshnewman

Subscribers: smahadevan

Differential Revision: https://phabricator.grailbio.com/D37556

fbshipit-source-id: 736a799
  • Loading branch information
jcharum authored and mariusae committed Oct 22, 2019
1 parent 078f177 commit d91dc24
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 69 deletions.
48 changes: 43 additions & 5 deletions bigmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package bigmachine
import (
"context"
"expvar"
"fmt"
"html/template"
"net/http"
"sort"
"strings"
"sync/atomic"
"time"

// Sha256 is imported because we use its implementation for
Expand All @@ -19,6 +21,7 @@ import (
"os"
"sync"

"github.com/grailbio/base/diagnostic/dump"
"github.com/grailbio/base/errors"
"github.com/grailbio/base/log"
"github.com/grailbio/bigmachine/rpc"
Expand All @@ -32,6 +35,13 @@ const RpcPrefix = "/bigrpc/"
// of testing situations, there is exactly one per process.
type B struct {
system System
// index is a process-unique identifier for this B. It is useful for
// distinguishing logs or diagnostic information.
index int32
// name is a human-usable name for this B that can be provided by clients.
// Like index, it is useful for distinguishing logs or diagnostic
// information, but may be set to something contextually meaningful.
name string

server *rpc.Server
client *rpc.Client
Expand All @@ -42,6 +52,20 @@ type B struct {
running bool
}

// Option is an option that can be provided when starting a new B. It is a
// function that can modify the b that will be returned by Start.
type Option func(b *B)

// Name is an option that will name the B. See B.name.
func Name(name string) Option {
return func(b *B) {
b.name = name
}
}

// nextBIndex is the index of the next B that is started.
var nextBIndex int32

// Start is the main entry point of bigmachine. Start starts a new B
// using the provided system, returning the instance. B's shutdown
// method should be called to tear down the session, usually in a
Expand All @@ -54,11 +78,15 @@ type B struct {
//
// // bigmachine driver code
// }
func Start(system System) *B {
func Start(system System, opts ...Option) *B {
b := &B{
index: atomic.AddInt32(&nextBIndex, 1) - 1,
system: system,
machines: make(map[string]*Machine),
}
for _, opt := range opts {
opt(b)
}
b.run()
// Test systems run in a single process space and thus
// expvar would panic with duplicate key errors.
Expand All @@ -67,6 +95,19 @@ func Start(system System) *B {
if system.Name() != "testsystem" && expvar.Get("machines") == nil {
expvar.Publish("machines", &machineVars{b})
}

if system.Name() != "testsystem" {
pfx := fmt.Sprintf("bigmachine-%02d-", b.index)
if b.name != "" {
pfx += fmt.Sprintf("%s-", b.name)
}
dump.Register(pfx+"status", makeStatusDumpFunc(b))
dump.Register(pfx+"pprof-goroutine", makeProfileDumpFunc(b, "goroutine", 2))
// TODO(jcharumilind): Should the heap profile do a gc?
dump.Register(pfx+"pprof-heap", makeProfileDumpFunc(b, "heap", 0))
dump.Register(pfx+"pprof-mutex", makeProfileDumpFunc(b, "mutex", 1))
dump.Register(pfx+"pprof-profile", makeProfileDumpFunc(b, "profile", 0))
}
return b
}

Expand Down Expand Up @@ -219,17 +260,14 @@ func (b *B) Machines() []*Machine {
return snapshot
}

// HandleDebug registers diagnostic http endpoints on the provided
// ServeMux.
// HandleDebug registers diagnostic http endpoints on the provided ServeMux.
func (b *B) HandleDebug(mux *http.ServeMux) {
b.HandleDebugPrefix("/debug/bigmachine/", mux)
}

// HandleDebugPrefix registers diagnostic http endpoints on the provided
// ServeMux under the provided prefix.
func (b *B) HandleDebugPrefix(prefix string, mux *http.ServeMux) {
mux.Handle(prefix+"pprof/profile", &profileHandler{b, "profile"})
mux.Handle(prefix+"pprof/heap", &profileHandler{b, "heap"})
mux.HandleFunc(prefix+"pprof/", b.pprofIndex)
mux.Handle(prefix+"status", &statusHandler{b})
}
Expand Down
182 changes: 127 additions & 55 deletions profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/google/pprof/profile"
"github.com/grailbio/base/diagnostic/dump"
"github.com/grailbio/base/log"
"golang.org/x/sync/errgroup"
)
Expand All @@ -30,41 +31,115 @@ type profileHandler struct {
which string
}

func (p *profileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sec64, _ := strconv.ParseInt(r.FormValue("seconds"), 10, 64)
sec := int(sec64)
if sec == 0 {
sec = 30
func (h *profileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
sec64, _ = strconv.ParseInt(r.FormValue("seconds"), 10, 64)
sec = int(sec64)
debug, _ = strconv.Atoi(r.FormValue("debug"))
gc, _ = strconv.Atoi(r.FormValue("gc"))
addr = r.FormValue("machine")
)
p := profiler{
b: h.b,
which: h.which,
addr: addr,
sec: sec,
debug: debug,
gc: gc > 0,
}
debug, _ := strconv.Atoi(r.FormValue("debug"))
gc, _ := strconv.Atoi(r.FormValue("gc"))
// If a machine is specified, we pass through the profile directly.
if addr := r.FormValue("machine"); addr != "" {
ctx := r.Context()
m, err := p.b.Dial(ctx, addr)
if err != nil {
profileErrorf(w, http.StatusInternalServerError, "failed to dial machine: %v", err)
return
w.Header().Set("Content-Type", p.ContentType())
err := p.Marshal(r.Context(), w)
if err != nil {
code := http.StatusInternalServerError
if _, ok := err.(errNoProfiles); ok {
code = http.StatusNotFound
}
rc, err := getProfile(ctx, m, p.which, sec, debug, gc > 0)
profileErrorf(w, code, err.Error())
}
}

func getProfile(ctx context.Context, m *Machine, which string, sec, debug int, gc bool) (rc io.ReadCloser, err error) {
if which == "profile" {
err = m.Call(ctx, "Supervisor.CPUProfile", time.Duration(sec)*time.Second, &rc)
} else {
err = m.Call(ctx, "Supervisor.Profile", profileRequest{which, debug, gc}, &rc)
}
return
}

func profileErrorf(w http.ResponseWriter, code int, message string, args ...interface{}) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(code)
if _, err := fmt.Fprintf(w, message, args...); err != nil {
log.Printf("error writing profile 500: %v", err)
}
}

// profiler writes (possibly aggregated) profiles, configured by its fields.
type profiler struct {
b *B
// which is the name of the profile to write.
which string
// addr is the specific machine's profile to write. If addr == "", profiles
// are aggregated from all of b's machines.
addr string
// sec is the number of seconds for which to generate a CPU profile. It is
// only relevant when which == "profile".
sec int
// debug is the debug value passed to pprof to determine the format of the
// profile output. See pprof documentation for details.
debug int
// gc determines whether we request a garbage collection before taking the
// profile. This is only relevant when which == "heap".
gc bool
}

// errNoProfiles is a marker type for the error that is returned by
// (profiler).Marshal when there are no profiles from the cluster machines. We
// use this to signal that we want to return a StatusNotFound when we are
// writing the profile in an HTTP response.
type errNoProfiles string

func (e errNoProfiles) Error() string {
return string(e)
}

// ContentType returns the expected content type, assuming success, of a call
// to Marshal. This is used to set the Content-Type header when we are writing
// the profile in an HTTP response. This may be overridden if there is an
// error.
func (p profiler) ContentType() string {
if p.debug > 0 && p.which != "profile" {
return "text/plain; charset=utf-8"
}
return "application/octet-stream"
}

// Marshal writes the profile configured in pw to w.
func (p profiler) Marshal(ctx context.Context, w io.Writer) (err error) {
if p.addr != "" {
m, err := p.b.Dial(ctx, p.addr)
if err != nil {
profileErrorf(w, http.StatusInternalServerError, "failed to collect %s profile: %v", p.which, err)
return
return fmt.Errorf("failed to dial machine: %v", err)
}
defer rc.Close()
if debug > 0 && p.which != "profile" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
} else {
w.Header().Set("Content-Type", "application/octet-stream")
rc, err := getProfile(ctx, m, p.which, p.sec, p.debug, p.gc)
if err != nil {
return fmt.Errorf("failed to collect %s profile: %v", p.which, err)
}
defer func() {
cerr := rc.Close()
if err == nil {
err = cerr
}
}()
_, err = io.Copy(w, rc)
if err != nil {
profileErrorf(w, http.StatusInternalServerError, "failed to write %s profile: %v", p.which, err)
return fmt.Errorf("failed to write %s profile: %v", p.which, err)
}
return
return nil
}

g, ctx := errgroup.WithContext(r.Context())
g, ctx := errgroup.WithContext(ctx)
var (
mu sync.Mutex
profiles = map[*Machine][]byte{}
Expand All @@ -75,13 +150,18 @@ func (p *profileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
continue
}
m := m
g.Go(func() error {
rc, err := getProfile(ctx, m, p.which, sec, debug, gc > 0)
g.Go(func() (err error) {
rc, err := getProfile(ctx, m, p.which, p.sec, p.debug, p.gc)
if err != nil {
log.Error.Printf("failed to collect profile %s from %s: %v", p.which, m.Addr, err)
return nil
}
defer rc.Close()
defer func() {
cerr := rc.Close()
if err == nil {
err = cerr
}
}()
b, err := ioutil.ReadAll(rc)
if err != nil {
log.Error.Printf("failed to read profile from %s: %v", m.Addr, err)
Expand All @@ -94,16 +174,13 @@ func (p *profileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
})
}
if err := g.Wait(); err != nil {
profileErrorf(w, http.StatusInternalServerError, "failed to fetch profiles: %v", err)
return
return fmt.Errorf("failed to fetch profiles: %v", err)
}
if len(profiles) == 0 {
profileErrorf(w, http.StatusNotFound, "no profiles are available at this time")
return
return errNoProfiles("no profiles are available at this time")
}
// Debug output is intended for human consumption.
if debug > 0 {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
if p.debug > 0 && p.which != "profiles" {
sort.Slice(machines, func(i, j int) bool { return machines[i].Addr < machines[j].Addr })
for _, m := range machines {
prof := profiles[m]
Expand All @@ -114,41 +191,36 @@ func (p *profileHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(prof)
fmt.Fprintln(w)
}
return
return nil
}

var parsed []*profile.Profile
for m, b := range profiles {
prof, err := profile.Parse(bytes.NewReader(b))
if err != nil {
log.Error.Printf("failed to parse profile from %s: %v", m.Addr, err)
continue
return fmt.Errorf("failed to parse profile from %s: %v", m.Addr, err)
}
parsed = append(parsed, prof)
}
prof, err := profile.Merge(parsed)
if err != nil {
profileErrorf(w, http.StatusInternalServerError, "profile merge error: %v", err)
return
return fmt.Errorf("profile merge error: %v", err)
}
w.Header().Set("Content-Type", "application/octet-stream")
if err := prof.Write(w); err != nil {
profileErrorf(w, http.StatusInternalServerError, "failed to write profile: %v", err)
return fmt.Errorf("failed to write profile: %v", err)
}
return nil
}

func getProfile(ctx context.Context, m *Machine, which string, sec, debug int, gc bool) (rc io.ReadCloser, err error) {
if which == "profile" {
err = m.Call(ctx, "Supervisor.CPUProfile", time.Duration(sec)*time.Second, &rc)
} else {
err = m.Call(ctx, "Supervisor.Profile", profileRequest{which, debug, gc}, &rc)
func makeProfileDumpFunc(b *B, which string, debug int) dump.Func {
p := profiler{
b: b,
which: which,
sec: 30,
debug: debug,
gc: false,
}
return func(ctx context.Context, w io.Writer) error {
return p.Marshal(ctx, w)
}
return
}

func profileErrorf(w http.ResponseWriter, code int, message string, args ...interface{}) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Go-Pprof", "1")
w.WriteHeader(code)
fmt.Fprintf(w, message, args...)
}
Loading

0 comments on commit d91dc24

Please sign in to comment.