Skip to content

Commit

Permalink
WATCHER: Fix bug wherein multiple consumers will try and process the …
Browse files Browse the repository at this point in the history
…same event. Add tests for queue and poller.
  • Loading branch information
Cian911 committed Jan 10, 2022
1 parent 041bf85 commit f987ec1
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 32 deletions.
24 changes: 13 additions & 11 deletions watcher/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ import (

// Poll polls the queue for valid events given an interval (in seconds)
func (pw *PathWatcher) Poll(interval int) {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-ticker.C:
log.Printf("Polling... - Queue Size: %d\n", pw.Queue.Size())
go func() {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-ticker.C:
log.Printf("Polling... - Queue Size: %d\n", pw.Queue.Size())

for hsh, ev := range pw.Queue.Queue {
timeDiff := ev.Timestamp.Sub(time.Now())
if timeDiff < (time.Duration(-interval) * time.Second) {
pw.Notify(ev.Path, ev.Operation)
pw.Queue.Remove(hsh)
for hsh, ev := range pw.Queue.Queue {
timeDiff := ev.Timestamp.Sub(time.Now())
if timeDiff < (time.Duration(-interval) * time.Second) {
pw.Notify(ev.Path, ev.Operation)
pw.Queue.Remove(hsh)
}
}
}
}
}
}()
}
34 changes: 34 additions & 0 deletions watcher/poller_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
package watcher

import (
"testing"
"time"
)

var (
pollInterval = 1
)

func TestPoller(t *testing.T) {
t.Run("It successfully notifies of a new event", func(t *testing.T) {
pw := setupPathwatcher("/tmp")
pw.Poll(pollInterval)

ev := eventSetup(t)
pw.Queue.Add(*ev)

if pw.Queue.Size() != 1 {
t.Errorf("Queue size did not increase. want=%d, got=%d", 1, pw.Queue.Size())
}
<-time.After(3 * time.Second)

if pw.Queue.Size() != 0 {
t.Errorf("Queue size did not decrease. want=%d, got=%d", 0, pw.Queue.Size())
}
})
}

func setupPathwatcher(path string) *PathWatcher {
return &PathWatcher{
Queue: NewQueue(),
}
}
58 changes: 52 additions & 6 deletions watcher/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package watcher

import (
"reflect"
"testing"
"time"

"github.com/cian911/switchboard/event"
)

var (
gFile = "sample.txt"
gPath = "/var/sample.txt"
gExt = ".txt"
)

func TestQueue(t *testing.T) {
t.Run("It adds one event to the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent()
ev := testEvent(gFile, gPath, gExt)

q.Add(*ev)

Expand All @@ -21,7 +28,7 @@ func TestQueue(t *testing.T) {

t.Run("It updates the event in the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent()
ev := testEvent(gFile, gPath, gExt)

q.Add(*ev)
q.Add(*ev)
Expand All @@ -32,17 +39,56 @@ func TestQueue(t *testing.T) {
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size())
}
})

t.Run("It gets an item from the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

hash := Hash(*ev)
q.Add(*ev)
e := q.Retrieve(hash)

if !reflect.DeepEqual(ev, &e) {
t.Errorf("Events are not the same. want=%v, got=%v", ev, e)
}
})

t.Run("It removes an item from the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

hash := Hash(*ev)
q.Add(*ev)
q.Remove(hash)

if q.Size() != 0 {
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 0, q.Size())
}
})

t.Run("It returns a unique hash for a given event", func(t *testing.T) {
ev1 := testEvent(gFile, gPath, gExt)
ev2 := testEvent("sample2.txt", "/var/sample2.txt", ".txt")

h1 := Hash(*ev1)
h2 := Hash(*ev2)

if h1 == h2 {
t.Errorf("Hashes are the same when they shouldn't be. want=%s, got=%s", h1, h2)
}
})
}

func setupQueue() *Q {
return NewQueue()
}

func testEvent() *event.Event {
func testEvent(file, path, ext string) *event.Event {
return &event.Event{
File: "sample.txt",
Path: "/var/sample.txt",
Ext: ".txt",
File: file,
Path: path,
Ext: ext,
Timestamp: time.Now(),
}

}
22 changes: 8 additions & 14 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Producer interface {
// Notify consumers of an event
Notify(path, event string)
// Observe the producer
Observe()
Observe(pollInterval int)
}

// Consumer interface
Expand Down Expand Up @@ -61,10 +61,6 @@ type PathConsumer struct {
// Receive takes a path and an event operation, determines its validity
// and passes it to be processed it if valid
func (pc *PathConsumer) Receive(path, ev string) {
log.Printf("Event Received: %s, Path: %s\n", ev, path)

// TODO: Move IsNewDirEvent to utils and call func on event struct
// TODO: If is a dir event, there should not be a file ext
e := &event.Event{
File: filepath.Base(path),
Path: path,
Expand All @@ -74,14 +70,14 @@ func (pc *PathConsumer) Receive(path, ev string) {
Operation: ev,
}

if e.IsNewDirEvent() {
log.Println("Event is a new dir")
if !e.IsNewDirEvent() && ev != pc.Ext && filepath.Dir(path) != pc.Path {
// Do not process event for consumers not watching file
return
}

// Recursively scan dir for items with our ext
// Then add all recursive dirs as paths
if e.IsNewDirEvent() {
pc.ProcessDirEvent(e)
} else if e.IsValidEvent(pc.Ext) {
log.Println("Event is valid")
pc.Process(e)
}
}
Expand Down Expand Up @@ -137,11 +133,9 @@ func (pw *PathWatcher) Unregister(consumer *Consumer) {
}

// Observe the producer
func (pw *PathWatcher) Observe() {
func (pw *PathWatcher) Observe(pollInterval int) {
pw.Queue = NewQueue()
go func() {
pw.Poll(3)
}()
pw.Poll(pollInterval)

watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/cian911/switchboard/event"
"github.com/cian911/switchboard/utils"
Expand Down Expand Up @@ -59,7 +60,6 @@ func TestWatcher(t *testing.T) {
pw, pc := setup(ev.Path, ev.Destination, ev.Ext)
pw.Register(&pc)
pw.Unregister(&pc)
t.Log("PATH: " + ev.Path + " FILE: " + ev.File)

for i := 1; i <= 3; i++ {
file := createTempFile(ev.Path, ".txt", t)
Expand Down Expand Up @@ -112,6 +112,7 @@ func eventSetup(t *testing.T) *event.Event {
Destination: t.TempDir(),
Ext: ext,
Operation: "CREATE",
Timestamp: time.Now(),
}
}

Expand Down

0 comments on commit f987ec1

Please sign in to comment.