Skip to content

Commit

Permalink
More reusable pkg, step 1
Browse files Browse the repository at this point in the history
In the perspective of a reusable, generic pkg/ content:
* Reduce the scope expected from logger, and reduce it to a minimal interface
* Start to remove KfConfig (remains: observer and controller): pass
  naked arguments directly to packages constructors
  • Loading branch information
bpineau committed Apr 25, 2018
1 parent 0f1ea3d commit 224b969
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 119 deletions.
6 changes: 3 additions & 3 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func runE(cmd *cobra.Command, args []string) (err error) {
ResyncIntv: resync,
}

repo, err := git.New(conf).Start()
repo, err := git.New(logger, dryRun, localDir, gitURL).Start()
if err != nil {
conf.Logger.Fatalf("failed to start git repo handler: %v", err)
}

evts := event.New()
reco := recorder.New(conf, evts).Start()
reco := recorder.New(logger, evts, localDir, resyncInt*2, dryRun).Start()
obsv := observer.New(conf, evts, &controller.Factory{}).Start()
http := health.New(conf).Start()
http := health.New(logger, healthP).Start()

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
Expand Down
29 changes: 17 additions & 12 deletions pkg/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,54 @@ import (
"fmt"
"io"
"net/http"

"github.com/bpineau/katafygio/config"
)

type logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
}

// Listener is an http health check listener
type Listener struct {
config *config.KfConfig
logger logger
port int
donech chan struct{}
srv *http.Server
}

// New create a new http health check listener
func New(config *config.KfConfig) *Listener {
func New(log logger, port int) *Listener {
return &Listener{
config: config,
logger: log,
port: port,
donech: make(chan struct{}),
srv: nil,
}
}

func (h *Listener) healthCheckReply(w http.ResponseWriter, r *http.Request) {
if _, err := io.WriteString(w, "ok\n"); err != nil {
h.config.Logger.Warningf("Failed to reply to http healtcheck from %s: %s\n", r.RemoteAddr, err)
h.logger.Errorf("Failed to reply to http healtcheck from %s: %s\n", r.RemoteAddr, err)
}
}

// Start exposes an http healthcheck handler
func (h *Listener) Start() *Listener {
if h.config.HealthPort == 0 {
if h.port == 0 {
return h
}

h.config.Logger.Info("Starting http healtcheck handler")
h.logger.Infof("Starting http healtcheck handler")

h.srv = &http.Server{Addr: fmt.Sprintf(":%d", h.config.HealthPort)}
h.srv = &http.Server{Addr: fmt.Sprintf(":%d", h.port)}

http.HandleFunc("/health", h.healthCheckReply)

go func() {
defer close(h.donech)
err := h.srv.ListenAndServe()
if err != nil && err.Error() != "http: Server closed" {
h.config.Logger.Errorf("healthcheck server failed: %v", err)
h.logger.Errorf("healthcheck server failed: %v", err)
}
}()

Expand All @@ -61,11 +66,11 @@ func (h *Listener) Stop() {
return
}

h.config.Logger.Info("Stopping http healtcheck handler")
h.logger.Infof("Stopping http healtcheck handler")

err := h.srv.Shutdown(context.TODO())
if err != nil {
h.config.Logger.Warningf("failed to stop http healtcheck handler: %v", err)
h.logger.Errorf("failed to stop http healtcheck handler: %v", err)
}

<-h.donech
Expand Down
22 changes: 6 additions & 16 deletions pkg/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,29 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/log"
)

func TestNoopHealth(t *testing.T) {
var logs = log.New("error", "", "test")

conf := &config.KfConfig{
Logger: log.New("error", "", "test"),
HealthPort: 0,
}
func TestNoopHealth(t *testing.T) {

// shouldn't panic with 0 as port
hc := New(conf)
hc := New(logs, 0)
_ = hc.Start()
hc.Stop()

conf.HealthPort = -42
hc = New(conf)
hc = New(logs, -42)
_ = hc.Start()
hc.Stop()
hook := hc.config.Logger.Hooks[logrus.InfoLevel][0].(*test.Hook)
hook := logs.Hooks[logrus.InfoLevel][0].(*test.Hook)
if len(hook.Entries) != 1 {
t.Error("Failed to log an issue with a bogus port")
}
}

func TestHealthCheck(t *testing.T) {
conf := &config.KfConfig{
Logger: log.New("info", "", "test"),
HealthPort: 0,
}

hc := New(conf)
hc := New(logs, 0)

req, err := http.NewRequest("GET", "/health", nil)
if err != nil {
Expand Down
58 changes: 32 additions & 26 deletions pkg/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/spf13/afero"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/event"
)

Expand All @@ -22,6 +21,11 @@ var (
crc64Table = crc64.MakeTable(crc64.ECMA)
)

type logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
}

// activeFiles will contain a list of active (present in cluster) objets; we'll
// use that to periodically find and garbage collect stale objets in the git repos
// (ie. if some objects were delete from cluster while katafygio was not running),
Expand All @@ -30,36 +34,42 @@ type activeFiles map[string]uint64

// Listener receive events from controllers and save them to disk as yaml files
type Listener struct {
config *config.KfConfig
logger logger
events event.Notifier
actives activeFiles
activesLock sync.RWMutex
localDir string
gcInterval time.Duration
dryRun bool
stopch chan struct{}
donech chan struct{}
}

// New creates a new event Listener
func New(config *config.KfConfig, events event.Notifier) *Listener {
func New(log logger, events event.Notifier, localDir string, gcInterval int, dryRun bool) *Listener {
return &Listener{
config: config,
events: events,
actives: activeFiles{},
stopch: make(chan struct{}),
donech: make(chan struct{}),
logger: log,
events: events,
actives: activeFiles{},
localDir: localDir,
dryRun: dryRun,
gcInterval: time.Duration(gcInterval) * time.Second,
stopch: make(chan struct{}),
donech: make(chan struct{}),
}
}

// Start continuously receive events and saves them to disk as files
func (w *Listener) Start() *Listener {
w.config.Logger.Info("Starting event recorder")
err := appFs.MkdirAll(filepath.Clean(w.config.LocalDir), 0700)
w.logger.Infof("Starting event recorder")
err := appFs.MkdirAll(filepath.Clean(w.localDir), 0700)
if err != nil {
panic(fmt.Sprintf("Can't create directory %s: %v", w.config.LocalDir, err))
panic(fmt.Sprintf("Can't create directory %s: %v", w.localDir, err))
}

go func() {
evCh := w.events.ReadChan()
gcTick := time.NewTicker(w.config.ResyncIntv * 2)
gcTick := time.NewTicker(w.gcInterval)
defer gcTick.Stop()
defer close(w.donech)

Expand All @@ -80,15 +90,15 @@ func (w *Listener) Start() *Listener {

// Stop halts the recorder service
func (w *Listener) Stop() {
w.config.Logger.Info("Stopping event recorder")
w.logger.Infof("Stopping event recorder")
close(w.stopch)
<-w.donech
}

func (w *Listener) processNextEvent(ev *event.Notification) {
path, err := getPath(w.config.LocalDir, ev)
path, err := getPath(w.localDir, ev)
if err != nil {
w.config.Logger.Errorf("failed to get %s path: %v", ev.Key, err)
w.logger.Errorf("failed to get %s path: %v", ev.Key, err)
}

switch ev.Action {
Expand All @@ -99,7 +109,7 @@ func (w *Listener) processNextEvent(ev *event.Notification) {
}

if err != nil {
w.config.Logger.Errorf("failed to delete or save %s: %v", ev.Key, err)
w.logger.Errorf("failed to delete or save %s: %v", ev.Key, err)
}
}

Expand All @@ -115,8 +125,7 @@ func getPath(root string, ev *event.Notification) (string, error) {
}

func (w *Listener) remove(file string) error {
w.config.Logger.Debugf("Removing %s from disk", file)
if w.config.DryRun {
if w.dryRun {
return nil
}

Expand All @@ -127,14 +136,12 @@ func (w *Listener) remove(file string) error {
}

func (w *Listener) relativePath(file string) string {
root := filepath.Clean(w.config.LocalDir)
root := filepath.Clean(w.localDir)
return strings.Replace(file, root+"/", "", 1)
}

func (w *Listener) save(file string, data []byte) error {
w.config.Logger.Debugf("Saving %s to disk", file)

if w.config.DryRun {
if w.dryRun {
return nil
}

Expand Down Expand Up @@ -182,7 +189,7 @@ func (w *Listener) save(file string, data []byte) error {
func (w *Listener) deleteObsoleteFiles() {
w.activesLock.RLock()
defer w.activesLock.RUnlock()
root := filepath.Clean(w.config.LocalDir)
root := filepath.Clean(w.localDir)

err := afero.Walk(appFs, root, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
Expand All @@ -198,15 +205,14 @@ func (w *Listener) deleteObsoleteFiles() {
return nil
}

w.config.Logger.Debugf("Removing %s from disk", path)
if !w.config.DryRun {
if !w.dryRun {
return appFs.Remove(filepath.Clean(path))
}

return nil
})

if err != nil {
w.config.Logger.Warnf("failed to gc some files: %v", err)
w.logger.Errorf("failed to gc some files: %v", err)
}
}

0 comments on commit 224b969

Please sign in to comment.