Skip to content

Commit

Permalink
refactor connectors for retry logic, add error view
Browse files Browse the repository at this point in the history
  • Loading branch information
bcicen committed May 22, 2019
1 parent 42f095c commit 98fcfe8
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 72 deletions.
46 changes: 36 additions & 10 deletions connector/docker.go
Expand Up @@ -17,27 +17,45 @@ type Docker struct {
client *api.Client
containers map[string]*container.Container
needsRefresh chan string // container IDs requiring refresh
closed chan struct{}
lock sync.RWMutex
}

func NewDocker() Connector {
func NewDocker() (Connector, error) {
// init docker client
client, err := api.NewClientFromEnv()
if err != nil {
panic(err)
return nil, err
}
cm := &Docker{
client: client,
containers: make(map[string]*container.Container),
needsRefresh: make(chan string, 60),
closed: make(chan struct{}),
lock: sync.RWMutex{},
}

// query info as pre-flight healthcheck
info, err := client.Info()
if err != nil {
return nil, err
}

log.Debugf("docker-connector ID: %s", info.ID)
log.Debugf("docker-connector Driver: %s", info.Driver)
log.Debugf("docker-connector Images: %d", info.Images)
log.Debugf("docker-connector Name: %s", info.Name)
log.Debugf("docker-connector ServerVersion: %s", info.ServerVersion)

go cm.Loop()
cm.refreshAll()
go cm.watchEvents()
return cm
return cm, nil
}

// Docker implements Connector
func (cm *Docker) Wait() struct{} { return <-cm.closed }

// Docker events watcher
func (cm *Docker) watchEvents() {
log.Info("docker event listener starting")
Expand All @@ -60,6 +78,8 @@ func (cm *Docker) watchEvents() {
cm.delByID(e.ID)
}
}
log.Info("docker event listener exited")
close(cm.closed)
}

func portsFormat(ports map[api.Port][]api.PortBinding) string {
Expand Down Expand Up @@ -114,7 +134,7 @@ func (cm *Docker) inspect(id string) *api.Container {
c, err := cm.client.InspectContainer(id)
if err != nil {
if _, ok := err.(*api.NoSuchContainer); !ok {
log.Errorf(err.Error())
log.Errorf("%s (%T)", err.Error(), err)
}
}
return c
Expand All @@ -125,7 +145,8 @@ func (cm *Docker) refreshAll() {
opts := api.ListContainersOptions{All: true}
allContainers, err := cm.client.ListContainers(opts)
if err != nil {
panic(err)
log.Errorf("%s (%T)", err.Error(), err)
return
}

for _, i := range allContainers {
Expand All @@ -137,9 +158,14 @@ func (cm *Docker) refreshAll() {
}

func (cm *Docker) Loop() {
for id := range cm.needsRefresh {
c := cm.MustGet(id)
cm.refresh(c)
for {
select {
case id := <-cm.needsRefresh:
c := cm.MustGet(id)
cm.refresh(c)
case <-cm.closed:
return
}
}
}

Expand All @@ -161,7 +187,7 @@ func (cm *Docker) MustGet(id string) *container.Container {
return c
}

// Get a single container, by ID
// Docker implements Connector
func (cm *Docker) Get(id string) (*container.Container, bool) {
cm.lock.Lock()
c, ok := cm.containers[id]
Expand All @@ -177,7 +203,7 @@ func (cm *Docker) delByID(id string) {
log.Infof("removed dead container: %s", id)
}

// All returns array of all containers, sorted by field
// Docker implements Connector
func (cm *Docker) All() (containers container.Containers) {
cm.lock.Lock()
for _, c := range cm.containers {
Expand Down
85 changes: 77 additions & 8 deletions connector/main.go
Expand Up @@ -3,16 +3,88 @@ package connector
import (
"fmt"
"sort"
"sync"
"time"

"github.com/bcicen/ctop/container"
"github.com/bcicen/ctop/logging"
)

var (
log = logging.Init()
enabled = make(map[string]func() Connector)
enabled = make(map[string]ConnectorFn)
)

type ConnectorFn func() (Connector, error)

type Connector interface {
// All returns a pre-sorted container.Containers of all discovered containers
All() container.Containers
// Get returns a single container.Container by ID
Get(string) (*container.Container, bool)
// Wait waits for the underlying connection to be lost before returning
Wait() struct{}
}

// ConnectorSuper provides initial connection and retry on failure for
// an undlerying Connector type
type ConnectorSuper struct {
conn Connector
connFn ConnectorFn
err error
lock sync.RWMutex
}

func NewConnectorSuper(connFn ConnectorFn) *ConnectorSuper {
cs := &ConnectorSuper{
connFn: connFn,
err: fmt.Errorf("connecting..."),
}
go cs.loop()
return cs
}

// Get returns the underlying Connector, or nil and an error
// if the Connector is not yet initialized or is disconnected.
func (cs *ConnectorSuper) Get() (Connector, error) {
cs.lock.RLock()
defer cs.lock.RUnlock()
if cs.err != nil {
return nil, cs.err
}
return cs.conn, nil
}

func (cs *ConnectorSuper) setError(err error) {
cs.lock.Lock()
defer cs.lock.Unlock()
cs.err = err
}

func (cs *ConnectorSuper) loop() {
const interval = 3
for {
log.Infof("initializing connector")

conn, err := cs.connFn()
if err != nil {
cs.setError(fmt.Errorf("%s\n\nattempting to reconnect...", err))
log.Errorf("failed to initialize connector: %s (%T)", err, err)
log.Errorf("retrying in %ds", interval)
time.Sleep(interval * time.Second)
} else {
cs.conn = conn
cs.setError(nil)
log.Infof("successfully initialized connector")

// wait until connection closed
cs.conn.Wait()
cs.setError(fmt.Errorf("attempting to reconnect..."))
log.Infof("connector closed")
}
}
}

// Enabled returns names for all enabled connectors on the current platform
func Enabled() (a []string) {
for k, _ := range enabled {
Expand All @@ -22,14 +94,11 @@ func Enabled() (a []string) {
return a
}

func ByName(s string) (Connector, error) {
// ByName returns a ConnectorSuper for a given name, or error if the connector
// does not exists on the current platform
func ByName(s string) (*ConnectorSuper, error) {
if cfn, ok := enabled[s]; ok {
return cfn(), nil
return NewConnectorSuper(cfn), nil
}
return nil, fmt.Errorf("invalid connector type \"%s\"", s)
}

type Connector interface {
All() container.Containers
Get(string) (*container.Container, bool)
}
13 changes: 11 additions & 2 deletions connector/mock.go
Expand Up @@ -20,11 +20,11 @@ type Mock struct {
containers container.Containers
}

func NewMock() Connector {
func NewMock() (Connector, error) {
cs := &Mock{}
go cs.Init()
go cs.Loop()
return cs
return cs, nil
}

// Create Mock containers
Expand All @@ -41,6 +41,15 @@ func (cs *Mock) Init() {

}

func (cs *Mock) Wait() struct{} {
ch := make(chan struct{})
go func() {
time.Sleep(30 * time.Second)
close(ch)
}()
return <-ch
}

func (cs *Mock) makeContainer(aggression int64) {
collector := collector.NewMock(aggression)
manager := manager.NewMock()
Expand Down
56 changes: 33 additions & 23 deletions connector/runc.go
Expand Up @@ -54,35 +54,44 @@ type Runc struct {
factory libcontainer.Factory
containers map[string]*container.Container
libContainers map[string]libcontainer.Container
closed chan struct{}
needsRefresh chan string // container IDs requiring refresh
lock sync.RWMutex
}

func NewRunc() Connector {
func NewRunc() (Connector, error) {
opts, err := NewRuncOpts()
runcFailOnErr(err)
if err != nil {
return nil, err
}

factory, err := getFactory(opts)
runcFailOnErr(err)
if err != nil {
return nil, err
}

cm := &Runc{
opts: opts,
factory: factory,
containers: make(map[string]*container.Container),
libContainers: make(map[string]libcontainer.Container),
needsRefresh: make(chan string, 60),
closed: make(chan struct{}),
lock: sync.RWMutex{},
}

go func() {
for {
cm.refreshAll()
time.Sleep(5 * time.Second)
select {
case <-cm.closed:
return
case <-time.After(5 * time.Second):
cm.refreshAll()
}
}
}()
go cm.Loop()

return cm
return cm, nil
}

func (cm *Runc) GetLibc(id string) libcontainer.Container {
Expand Down Expand Up @@ -141,7 +150,11 @@ func (cm *Runc) refresh(id string) {
// Read runc root, creating any new containers
func (cm *Runc) refreshAll() {
list, err := ioutil.ReadDir(cm.opts.root)
runcFailOnErr(err)
if err != nil {
log.Errorf("%s (%T)", err.Error(), err)
close(cm.closed)
return
}

for _, i := range list {
if i.IsDir() {
Expand Down Expand Up @@ -199,14 +212,6 @@ func (cm *Runc) MustGet(id string) *container.Container {
return c
}

// Get a single container, by ID
func (cm *Runc) Get(id string) (*container.Container, bool) {
cm.lock.Lock()
defer cm.lock.Unlock()
c, ok := cm.containers[id]
return c, ok
}

// Remove containers by ID
func (cm *Runc) delByID(id string) {
cm.lock.Lock()
Expand All @@ -216,7 +221,18 @@ func (cm *Runc) delByID(id string) {
log.Infof("removed dead container: %s", id)
}

// All returns array of all containers, sorted by field
// Runc implements Connector
func (cm *Runc) Wait() struct{} { return <-cm.closed }

// Runc implements Connector
func (cm *Runc) Get(id string) (*container.Container, bool) {
cm.lock.Lock()
defer cm.lock.Unlock()
c, ok := cm.containers[id]
return c, ok
}

// Runc implements Connector
func (cm *Runc) All() (containers container.Containers) {
cm.lock.Lock()
for _, c := range cm.containers {
Expand All @@ -239,9 +255,3 @@ func getFactory(opts RuncOpts) (libcontainer.Factory, error) {
}
return libcontainer.New(opts.root, cgroupManager)
}

func runcFailOnErr(err error) {
if err != nil {
panic(fmt.Errorf("fatal runc error: %s", err))
}
}

0 comments on commit 98fcfe8

Please sign in to comment.