From 98fcfe8b6f3317e04339745a77a02bc13bba1240 Mon Sep 17 00:00:00 2001 From: Bradley Cicenas Date: Wed, 22 May 2019 16:58:55 +0000 Subject: [PATCH] refactor connectors for retry logic, add error view --- connector/docker.go | 46 ++++++++++++++++++------ connector/main.go | 85 ++++++++++++++++++++++++++++++++++++++++----- connector/mock.go | 13 +++++-- connector/runc.go | 56 +++++++++++++++++------------ cursor.go | 37 +++++++++++--------- go.mod | 1 + grid.go | 61 +++++++++++++++++++++++++++++--- main.go | 17 +++++---- widgets/error.go | 38 ++++++++++++++++++++ widgets/header.go | 2 +- 10 files changed, 284 insertions(+), 72 deletions(-) create mode 100644 widgets/error.go diff --git a/connector/docker.go b/connector/docker.go index d80f729..6595059 100644 --- a/connector/docker.go +++ b/connector/docker.go @@ -17,27 +17,45 @@ type Docker struct { client *api.Client containers map[string]*container.Container needsRefresh chan string // container IDs requiring refresh + closed chan struct{} lock sync.RWMutex } -func NewDocker() Connector { +func NewDocker() (Connector, error) { // init docker client client, err := api.NewClientFromEnv() if err != nil { - panic(err) + return nil, err } cm := &Docker{ client: client, containers: make(map[string]*container.Container), needsRefresh: make(chan string, 60), + closed: make(chan struct{}), lock: sync.RWMutex{}, } + + // query info as pre-flight healthcheck + info, err := client.Info() + if err != nil { + return nil, err + } + + log.Debugf("docker-connector ID: %s", info.ID) + log.Debugf("docker-connector Driver: %s", info.Driver) + log.Debugf("docker-connector Images: %d", info.Images) + log.Debugf("docker-connector Name: %s", info.Name) + log.Debugf("docker-connector ServerVersion: %s", info.ServerVersion) + go cm.Loop() cm.refreshAll() go cm.watchEvents() - return cm + return cm, nil } +// Docker implements Connector +func (cm *Docker) Wait() struct{} { return <-cm.closed } + // Docker events watcher func (cm *Docker) watchEvents() { log.Info("docker event listener starting") @@ -60,6 +78,8 @@ func (cm *Docker) watchEvents() { cm.delByID(e.ID) } } + log.Info("docker event listener exited") + close(cm.closed) } func portsFormat(ports map[api.Port][]api.PortBinding) string { @@ -114,7 +134,7 @@ func (cm *Docker) inspect(id string) *api.Container { c, err := cm.client.InspectContainer(id) if err != nil { if _, ok := err.(*api.NoSuchContainer); !ok { - log.Errorf(err.Error()) + log.Errorf("%s (%T)", err.Error(), err) } } return c @@ -125,7 +145,8 @@ func (cm *Docker) refreshAll() { opts := api.ListContainersOptions{All: true} allContainers, err := cm.client.ListContainers(opts) if err != nil { - panic(err) + log.Errorf("%s (%T)", err.Error(), err) + return } for _, i := range allContainers { @@ -137,9 +158,14 @@ func (cm *Docker) refreshAll() { } func (cm *Docker) Loop() { - for id := range cm.needsRefresh { - c := cm.MustGet(id) - cm.refresh(c) + for { + select { + case id := <-cm.needsRefresh: + c := cm.MustGet(id) + cm.refresh(c) + case <-cm.closed: + return + } } } @@ -161,7 +187,7 @@ func (cm *Docker) MustGet(id string) *container.Container { return c } -// Get a single container, by ID +// Docker implements Connector func (cm *Docker) Get(id string) (*container.Container, bool) { cm.lock.Lock() c, ok := cm.containers[id] @@ -177,7 +203,7 @@ func (cm *Docker) delByID(id string) { log.Infof("removed dead container: %s", id) } -// All returns array of all containers, sorted by field +// Docker implements Connector func (cm *Docker) All() (containers container.Containers) { cm.lock.Lock() for _, c := range cm.containers { diff --git a/connector/main.go b/connector/main.go index 809a501..61e3a13 100644 --- a/connector/main.go +++ b/connector/main.go @@ -3,6 +3,8 @@ package connector import ( "fmt" "sort" + "sync" + "time" "github.com/bcicen/ctop/container" "github.com/bcicen/ctop/logging" @@ -10,9 +12,79 @@ import ( var ( log = logging.Init() - enabled = make(map[string]func() Connector) + enabled = make(map[string]ConnectorFn) ) +type ConnectorFn func() (Connector, error) + +type Connector interface { + // All returns a pre-sorted container.Containers of all discovered containers + All() container.Containers + // Get returns a single container.Container by ID + Get(string) (*container.Container, bool) + // Wait waits for the underlying connection to be lost before returning + Wait() struct{} +} + +// ConnectorSuper provides initial connection and retry on failure for +// an undlerying Connector type +type ConnectorSuper struct { + conn Connector + connFn ConnectorFn + err error + lock sync.RWMutex +} + +func NewConnectorSuper(connFn ConnectorFn) *ConnectorSuper { + cs := &ConnectorSuper{ + connFn: connFn, + err: fmt.Errorf("connecting..."), + } + go cs.loop() + return cs +} + +// Get returns the underlying Connector, or nil and an error +// if the Connector is not yet initialized or is disconnected. +func (cs *ConnectorSuper) Get() (Connector, error) { + cs.lock.RLock() + defer cs.lock.RUnlock() + if cs.err != nil { + return nil, cs.err + } + return cs.conn, nil +} + +func (cs *ConnectorSuper) setError(err error) { + cs.lock.Lock() + defer cs.lock.Unlock() + cs.err = err +} + +func (cs *ConnectorSuper) loop() { + const interval = 3 + for { + log.Infof("initializing connector") + + conn, err := cs.connFn() + if err != nil { + cs.setError(fmt.Errorf("%s\n\nattempting to reconnect...", err)) + log.Errorf("failed to initialize connector: %s (%T)", err, err) + log.Errorf("retrying in %ds", interval) + time.Sleep(interval * time.Second) + } else { + cs.conn = conn + cs.setError(nil) + log.Infof("successfully initialized connector") + + // wait until connection closed + cs.conn.Wait() + cs.setError(fmt.Errorf("attempting to reconnect...")) + log.Infof("connector closed") + } + } +} + // Enabled returns names for all enabled connectors on the current platform func Enabled() (a []string) { for k, _ := range enabled { @@ -22,14 +94,11 @@ func Enabled() (a []string) { return a } -func ByName(s string) (Connector, error) { +// ByName returns a ConnectorSuper for a given name, or error if the connector +// does not exists on the current platform +func ByName(s string) (*ConnectorSuper, error) { if cfn, ok := enabled[s]; ok { - return cfn(), nil + return NewConnectorSuper(cfn), nil } return nil, fmt.Errorf("invalid connector type \"%s\"", s) } - -type Connector interface { - All() container.Containers - Get(string) (*container.Container, bool) -} diff --git a/connector/mock.go b/connector/mock.go index 59fb15b..d96496b 100644 --- a/connector/mock.go +++ b/connector/mock.go @@ -20,11 +20,11 @@ type Mock struct { containers container.Containers } -func NewMock() Connector { +func NewMock() (Connector, error) { cs := &Mock{} go cs.Init() go cs.Loop() - return cs + return cs, nil } // Create Mock containers @@ -41,6 +41,15 @@ func (cs *Mock) Init() { } +func (cs *Mock) Wait() struct{} { + ch := make(chan struct{}) + go func() { + time.Sleep(30 * time.Second) + close(ch) + }() + return <-ch +} + func (cs *Mock) makeContainer(aggression int64) { collector := collector.NewMock(aggression) manager := manager.NewMock() diff --git a/connector/runc.go b/connector/runc.go index e1b7638..c9f7c87 100644 --- a/connector/runc.go +++ b/connector/runc.go @@ -54,35 +54,44 @@ type Runc struct { factory libcontainer.Factory containers map[string]*container.Container libContainers map[string]libcontainer.Container + closed chan struct{} needsRefresh chan string // container IDs requiring refresh lock sync.RWMutex } -func NewRunc() Connector { +func NewRunc() (Connector, error) { opts, err := NewRuncOpts() - runcFailOnErr(err) + if err != nil { + return nil, err + } factory, err := getFactory(opts) - runcFailOnErr(err) + if err != nil { + return nil, err + } cm := &Runc{ opts: opts, factory: factory, containers: make(map[string]*container.Container), libContainers: make(map[string]libcontainer.Container), - needsRefresh: make(chan string, 60), + closed: make(chan struct{}), lock: sync.RWMutex{}, } go func() { for { - cm.refreshAll() - time.Sleep(5 * time.Second) + select { + case <-cm.closed: + return + case <-time.After(5 * time.Second): + cm.refreshAll() + } } }() go cm.Loop() - return cm + return cm, nil } func (cm *Runc) GetLibc(id string) libcontainer.Container { @@ -141,7 +150,11 @@ func (cm *Runc) refresh(id string) { // Read runc root, creating any new containers func (cm *Runc) refreshAll() { list, err := ioutil.ReadDir(cm.opts.root) - runcFailOnErr(err) + if err != nil { + log.Errorf("%s (%T)", err.Error(), err) + close(cm.closed) + return + } for _, i := range list { if i.IsDir() { @@ -199,14 +212,6 @@ func (cm *Runc) MustGet(id string) *container.Container { return c } -// Get a single container, by ID -func (cm *Runc) Get(id string) (*container.Container, bool) { - cm.lock.Lock() - defer cm.lock.Unlock() - c, ok := cm.containers[id] - return c, ok -} - // Remove containers by ID func (cm *Runc) delByID(id string) { cm.lock.Lock() @@ -216,7 +221,18 @@ func (cm *Runc) delByID(id string) { log.Infof("removed dead container: %s", id) } -// All returns array of all containers, sorted by field +// Runc implements Connector +func (cm *Runc) Wait() struct{} { return <-cm.closed } + +// Runc implements Connector +func (cm *Runc) Get(id string) (*container.Container, bool) { + cm.lock.Lock() + defer cm.lock.Unlock() + c, ok := cm.containers[id] + return c, ok +} + +// Runc implements Connector func (cm *Runc) All() (containers container.Containers) { cm.lock.Lock() for _, c := range cm.containers { @@ -239,9 +255,3 @@ func getFactory(opts RuncOpts) (libcontainer.Factory, error) { } return libcontainer.New(opts.root, cgroupManager) } - -func runcFailOnErr(err error) { - if err != nil { - panic(fmt.Errorf("fatal runc error: %s", err)) - } -} diff --git a/cursor.go b/cursor.go index 1001074..6c5a6e8 100644 --- a/cursor.go +++ b/cursor.go @@ -11,7 +11,7 @@ import ( type GridCursor struct { selectedID string // id of currently selected container filtered container.Containers - cSource connector.Connector + cSuper *connector.ConnectorSuper isScrolling bool // toggled when actively scrolling } @@ -25,14 +25,20 @@ func (gc *GridCursor) Selected() *container.Container { return nil } -// Refresh containers from source -func (gc *GridCursor) RefreshContainers() (lenChanged bool) { +// Refresh containers from source, returning whether the quantity of +// containers has changed and any error +func (gc *GridCursor) RefreshContainers() (bool, error) { oldLen := gc.Len() - - // Containers filtered by display bool gc.filtered = container.Containers{} + + cSource, err := gc.cSuper.Get() + if err != nil { + return true, err + } + + // filter Containers by display bool var cursorVisible bool - for _, c := range gc.cSource.All() { + for _, c := range cSource.All() { if c.Display { if c.Id == gc.selectedID { cursorVisible = true @@ -41,22 +47,21 @@ func (gc *GridCursor) RefreshContainers() (lenChanged bool) { } } - if oldLen != gc.Len() { - lenChanged = true - } - - if !cursorVisible { + if !cursorVisible || gc.selectedID == "" { gc.Reset() } - if gc.selectedID == "" { - gc.Reset() - } - return lenChanged + + return oldLen != gc.Len(), nil } // Set an initial cursor position, if possible func (gc *GridCursor) Reset() { - for _, c := range gc.cSource.All() { + cSource, err := gc.cSuper.Get() + if err != nil { + return + } + + for _, c := range cSource.All() { c.Widgets.UnHighlight() } if gc.Len() > 0 { diff --git a/go.mod b/go.mod index 4921cba..9afb509 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/op/go-logging v0.0.0-20160211212156-b2cb9fa56473 github.com/opencontainers/runc v0.1.1 + github.com/pkg/errors v0.8.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/seccomp/libseccomp-golang v0.0.0-20150813023252-1b506fc7c24e // indirect github.com/stretchr/testify v1.2.2 // indirect diff --git a/grid.go b/grid.go index 871d3cb..882d730 100644 --- a/grid.go +++ b/grid.go @@ -6,6 +6,43 @@ import ( ui "github.com/gizak/termui" ) +func ShowConnError(err error) (exit bool) { + ui.Clear() + ui.DefaultEvtStream.ResetHandlers() + defer ui.DefaultEvtStream.ResetHandlers() + + setErr := func(err error) { + errView.Text = err.Error() + ui.Render(errView) + } + + HandleKeys("exit", func() { + exit = true + ui.StopLoop() + }) + + ui.Handle("/timer/1s", func(ui.Event) { + _, err := cursor.RefreshContainers() + if err == nil { + ui.StopLoop() + return + } + setErr(err) + }) + + ui.Handle("/sys/wnd/resize", func(e ui.Event) { + errView.Resize() + ui.Clear() + ui.Render(errView) + log.Infof("RESIZE") + }) + + errView.Resize() + setErr(err) + ui.Loop() + return exit +} + func RedrawRows(clr bool) { // reinit body rows cGrid.Clear() @@ -33,7 +70,6 @@ func RedrawRows(clr bool) { } cGrid.Align() ui.Render(cGrid) - } func SingleView() MenuFn { @@ -68,16 +104,21 @@ func SingleView() MenuFn { return nil } -func RefreshDisplay() { +func RefreshDisplay() error { // skip display refresh during scroll if !cursor.isScrolling { - needsClear := cursor.RefreshContainers() + needsClear, err := cursor.RefreshContainers() + if err != nil { + return err + } RedrawRows(needsClear) } + return nil } func Display() bool { var menu MenuFn + var connErr error cGrid.SetWidth(ui.TermWidth()) ui.DefaultEvtStream.Hook(logEvent) @@ -126,7 +167,10 @@ func Display() bool { }) ui.Handle("/sys/kbd/a", func(ui.Event) { config.Toggle("allContainers") - RefreshDisplay() + connErr = RefreshDisplay() + if connErr != nil { + ui.StopLoop() + } }) ui.Handle("/sys/kbd/D", func(ui.Event) { dumpContainer(cursor.Selected()) @@ -160,7 +204,10 @@ func Display() bool { if log.StatusQueued() { ui.StopLoop() } - RefreshDisplay() + connErr = RefreshDisplay() + if connErr != nil { + ui.StopLoop() + } }) ui.Handle("/sys/wnd/resize", func(e ui.Event) { @@ -174,6 +221,10 @@ func Display() bool { ui.Loop() + if connErr != nil { + return ShowConnError(connErr) + } + if log.StatusQueued() { for sm := range log.FlushStatus() { if sm.IsError { diff --git a/main.go b/main.go index f2eac66..cd6a184 100644 --- a/main.go +++ b/main.go @@ -22,11 +22,12 @@ var ( version = "dev-build" goVersion = runtime.Version() - log *logging.CTopLogger - cursor *GridCursor - cGrid *compact.CompactGrid - header *widgets.CTopHeader - status *widgets.StatusLine + log *logging.CTopLogger + cursor *GridCursor + cGrid *compact.CompactGrid + header *widgets.CTopHeader + status *widgets.StatusLine + errView *widgets.ErrorView versionStr = fmt.Sprintf("ctop version %v, build %v %v", version, build, goVersion) ) @@ -104,14 +105,15 @@ func main() { defer Shutdown() // init grid, cursor, header - conn, err := connector.ByName(*connectorFlag) + cSuper, err := connector.ByName(*connectorFlag) if err != nil { panic(err) } - cursor = &GridCursor{cSource: conn} + cursor = &GridCursor{cSuper: cSuper} cGrid = compact.NewCompactGrid() header = widgets.NewCTopHeader() status = widgets.NewStatusLine() + errView = widgets.NewErrorView() for { exit := Display() @@ -140,6 +142,7 @@ func validSort(s string) { func panicExit() { if r := recover(); r != nil { Shutdown() + panic(r) fmt.Printf("error: %s\n", r) os.Exit(1) } diff --git a/widgets/error.go b/widgets/error.go new file mode 100644 index 0000000..458ccaa --- /dev/null +++ b/widgets/error.go @@ -0,0 +1,38 @@ +package widgets + +import ( + "fmt" + ui "github.com/gizak/termui" +) + +type ErrorView struct { + *ui.Par +} + +func NewErrorView() *ErrorView { + p := ui.NewPar("") + p.Border = true + p.Height = 10 + p.Width = 20 + p.PaddingTop = 1 + p.PaddingBottom = 1 + p.PaddingLeft = 2 + p.PaddingRight = 2 + p.Bg = ui.ThemeAttr("bg") + p.TextFgColor = ui.ThemeAttr("status.warn") + p.TextBgColor = ui.ThemeAttr("menu.text.bg") + p.BorderFg = ui.ThemeAttr("status.warn") + p.BorderLabelFg = ui.ThemeAttr("status.warn") + return &ErrorView{p} +} + +func (w *ErrorView) Buffer() ui.Buffer { + w.BorderLabel = fmt.Sprintf(" %s ", timeStr()) + return w.Par.Buffer() +} + +func (w *ErrorView) Resize() { + w.SetX(ui.TermWidth() / 12) + w.SetY(ui.TermHeight() / 3) + w.SetWidth(w.X * 10) +} diff --git a/widgets/header.go b/widgets/header.go index 4a96f8f..a7ab786 100644 --- a/widgets/header.go +++ b/widgets/header.go @@ -16,7 +16,7 @@ type CTopHeader struct { func NewCTopHeader() *CTopHeader { return &CTopHeader{ - Time: headerPar(2, timeStr()), + Time: headerPar(2, ""), Count: headerPar(24, "-"), Filter: headerPar(40, ""), bg: headerBg(),