diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index 75faa7e..ee9df9c 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -4,6 +4,7 @@ package recorder import ( "fmt" + "hash/crc64" "os" "path/filepath" "strings" @@ -16,12 +17,16 @@ import ( "github.com/bpineau/katafygio/pkg/event" ) -var appFs = afero.NewOsFs() +var ( + appFs = afero.NewOsFs() + crc64Table = crc64.MakeTable(crc64.ECMA) +) // activeFiles will contain a list of active (present in cluster) objets; we'll // use that to periodically find and garbage collect stale objets in the git repos -// (ie. if some objects were delete from cluster while katafygio was not running). -type activeFiles map[string]bool +// (ie. if some objects were delete from cluster while katafygio was not running), +// and to skip already existing and unchanged files. +type activeFiles map[string]uint64 // Listener receive events from controllers and save them to disk as yaml files type Listener struct { @@ -133,6 +138,15 @@ func (w *Listener) save(file string, data []byte) error { return nil } + csum := crc64.Checksum(data, crc64Table) + + w.activesLock.RLock() + prevsum, ok := w.actives[w.relativePath(file)] + w.activesLock.RUnlock() + if ok && prevsum == csum { + return nil + } + dir := filepath.Clean(filepath.Dir(file)) err := appFs.MkdirAll(dir, 0700) @@ -140,10 +154,6 @@ func (w *Listener) save(file string, data []byte) error { return fmt.Errorf("can't create local directory %s: %v", dir, err) } - w.activesLock.Lock() - w.actives[w.relativePath(file)] = true - w.activesLock.Unlock() - tmpf, err := afero.TempFile(appFs, "", "katafygio") if err != nil { return fmt.Errorf("failed to create a temporary file: %v", err) @@ -162,6 +172,10 @@ func (w *Listener) save(file string, data []byte) error { return fmt.Errorf("failed to rename %s to %s: %v", tmpf.Name(), file, err) } + w.activesLock.Lock() + w.actives[w.relativePath(file)] = csum + w.activesLock.Unlock() + return nil } diff --git a/pkg/recorder/recorder_test.go b/pkg/recorder/recorder_test.go index 4e07c95..e109d99 100644 --- a/pkg/recorder/recorder_test.go +++ b/pkg/recorder/recorder_test.go @@ -36,6 +36,7 @@ func TestRecorder(t *testing.T) { evt.Send(newNotif(event.Upsert, "foo1")) evt.Send(newNotif(event.Upsert, "foo2")) evt.Send(newNotif(event.Delete, "foo1")) + evt.Send(newNotif(event.Upsert, "foo2")) rec.Stop() // to flush ongoing fs operations