Skip to content

Commit

Permalink
Merge pull request #26 from bpineau/unittests_for_recorder
Browse files Browse the repository at this point in the history
Unit tests for recorder
  • Loading branch information
bpineau committed Apr 12, 2018
2 parents 6580918 + f9d61c7 commit a25735f
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
kubernetes-backup
dist/*
katafygio
profile.cov
21 changes: 15 additions & 6 deletions pkg/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import (
"sync"
"time"

"github.com/spf13/afero"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/event"
)

var appFs = afero.NewOsFs()

// activeFiles will contain a list of active (present in cluster) objets; we'll
// use that to periodically find and garbage collect stale objets in the git repos
// (ie. if some objects were delete from cluster while katafygio was not running).
Expand Down Expand Up @@ -41,7 +45,7 @@ func New(config *config.KfConfig, events event.Notifier) *Listener {
// Start receive events and saves them to disk as files
func (w *Listener) Start() *Listener {
w.config.Logger.Info("Starting event recorder")
err := os.MkdirAll(filepath.Clean(w.config.LocalDir), 0700)
err := appFs.MkdirAll(filepath.Clean(w.config.LocalDir), 0700)
if err != nil {
panic(fmt.Sprintf("Can't create directory %s: %v", w.config.LocalDir, err))
}
Expand Down Expand Up @@ -114,7 +118,7 @@ func (w *Listener) remove(file string) error {
w.activesLock.Lock()
delete(w.actives, file)
w.activesLock.Unlock()
return os.Remove(filepath.Clean(file))
return appFs.Remove(filepath.Clean(file))
}

func (w *Listener) relativePath(file string) string {
Expand All @@ -131,7 +135,7 @@ func (w *Listener) save(file string, data string) error {

dir := filepath.Clean(filepath.Dir(file))

err := os.MkdirAll(dir, 0700)
err := appFs.MkdirAll(dir, 0700)
if err != nil {
return fmt.Errorf("can't create local directory %s: %v", dir, err)
}
Expand All @@ -140,7 +144,7 @@ func (w *Listener) save(file string, data string) error {
w.actives[w.relativePath(file)] = true
w.activesLock.Unlock()

f, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
f, err := appFs.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("failed to create %s on disk: %v", file, err)
}
Expand All @@ -163,7 +167,7 @@ func (w *Listener) deleteObsoleteFiles() {
defer w.activesLock.RUnlock()
root := filepath.Clean(w.config.LocalDir)

err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
err := afero.Walk(appFs, root, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
Expand All @@ -177,7 +181,12 @@ func (w *Listener) deleteObsoleteFiles() {
return nil
}

return os.Remove(filepath.Clean(path))
w.config.Logger.Debugf("Removing %s from disk", path)
if !w.config.DryRun {
return appFs.Remove(filepath.Clean(path))
}

return nil
})

if err != nil {
Expand Down
152 changes: 152 additions & 0 deletions pkg/recorder/recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package recorder

import (
"testing"
"time"

"github.com/spf13/afero"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/event"
"github.com/bpineau/katafygio/pkg/log"
)

func newNotif(action event.Action, key string) *event.Notification {
return &event.Notification{
Action: action,
Key: key,
Kind: "foo",
Object: "bar",
}
}

func TestRecorder(t *testing.T) {
appFs = afero.NewMemMapFs()

evt := event.New()

conf := &config.KfConfig{
Logger: log.New("info", "", "test"),
LocalDir: "/tmp/ktest", // fake dir (in memory fs provided by Afero)
ResyncIntv: 60,
}

rec := New(conf, evt).Start()

evt.Send(newNotif(event.Upsert, "foo1"))
evt.Send(newNotif(event.Upsert, "foo2"))
evt.Send(newNotif(event.Delete, "foo1"))

rec.Stop() // to flush ongoing fs operations

exist, _ := afero.Exists(appFs, conf.LocalDir+"/foo-foo2.yaml")
if !exist {
t.Error("foo-foo2.yaml should exist; upsert event didn't propagate")
}

exist, _ = afero.Exists(appFs, conf.LocalDir+"/foo-foo1.yaml")
if exist {
t.Error("foo-foo1.yaml shouldn't exist, delete event didn't propagate")
}

rogue := conf.LocalDir + "/roguefile.yaml"
_ = afero.WriteFile(appFs, rogue, []byte{42}, 0600)
_ = afero.WriteFile(appFs, rogue+".txt", []byte{42}, 0600)
rec.deleteObsoleteFiles()

exist, _ = afero.Exists(appFs, rogue)
if exist {
t.Errorf("%s file should have been garbage collected", rogue)
}

exist, _ = afero.Exists(appFs, rogue+".txt")
if !exist {
t.Errorf("garbage collection should only touch .yaml files")
}
}

func TestDryRunRecorder(t *testing.T) {
appFs = afero.NewMemMapFs()

conf := &config.KfConfig{
Logger: log.New("info", "", "test"),
LocalDir: "/tmp/ktest",
ResyncIntv: 60,
}

conf.DryRun = true
dryevt := event.New()
dryrec := New(conf, dryevt).Start()
dryevt.Send(newNotif(event.Upsert, "foo3"))
dryevt.Send(newNotif(event.Upsert, "foo4"))
dryevt.Send(newNotif(event.Delete, "foo4"))
dryrec.Stop()

exist, _ := afero.Exists(appFs, conf.LocalDir+"/foo-foo3.yaml")
if exist {
t.Error("foo-foo3.yaml was created but we're in dry-run mode")
}

rogue := conf.LocalDir + "/roguefile.yaml"
_ = afero.WriteFile(appFs, rogue, []byte{42}, 0600)
dryrec.deleteObsoleteFiles()

exist, _ = afero.Exists(appFs, rogue)
if !exist {
t.Errorf("garbage collection shouldn't remove files in dry-run mode")
}
}

// testing behavior on fs errors (we shouldn't block the program)
func TestFailingFSRecorder(t *testing.T) {
appFs = afero.NewMemMapFs()

evt := event.New()

conf := &config.KfConfig{
Logger: log.New("info", "", "test"),
LocalDir: "/tmp/ktest", // fake dir (in memory fs provided by Afero)
ResyncIntv: 60,
}

rec := New(conf, evt).Start()

_ = afero.WriteFile(appFs, conf.LocalDir+"/foo.yaml", []byte{42}, 0600)

// switching to failing (read-only) filesystem
appFs = afero.NewReadOnlyFs(appFs)

err := rec.save("foo", "bar")
if err == nil {
t.Error("save should return an error in case of failure")
}

// shouldn't panic in case of failures
rec.deleteObsoleteFiles()

// shouldn't block (the controllers event loop will retry anyway)
ch := make(chan struct{})
go func() {
evt.Send(newNotif(event.Upsert, "foo3"))
evt.Send(newNotif(event.Upsert, "foo4"))
ch <- struct{}{}
}()

select {
case <-ch:
case <-time.After(5 * time.Second):
t.Error("recorder shouldn't block in case of fs failure")
}

// back to normal operations
rec.Stop() // just to flush ongoing ops before switch filesystem
appFs = afero.NewMemMapFs()
rec.Start()
evt.Send(newNotif(event.Upsert, "foo2"))
rec.Stop() // flush ongoing ops

exist, _ := afero.Exists(appFs, conf.LocalDir+"/foo-foo2.yaml")
if !exist {
t.Error("foo-foo2.yaml should exist; recorder should recover from fs failures")
}
}

0 comments on commit a25735f

Please sign in to comment.