Skip to content

Commit

Permalink
WATCHER: Make queue functional with poller
Browse files Browse the repository at this point in the history
  • Loading branch information
Cian911 committed Jan 9, 2022
1 parent 4a197e0 commit 91554dc
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 21 deletions.
19 changes: 16 additions & 3 deletions watcher/poller.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
package watcher

import "time"
import (
"log"
"time"
)

func Poll(interval int) {
ticker := time.NewTicker(interval * time.Second)
// Poll polls the queue for valid events given an interval (in seconds)
func (pw *PathWatcher) Poll(interval int) {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-ticker.C:
log.Printf("Polling... - Queue Size: %d\n", pw.Queue.Size())

for hsh, ev := range pw.Queue.Queue {
timeDiff := ev.Timestamp.Sub(time.Now())
if timeDiff < (time.Duration(-interval) * time.Second) {
pw.Notify(ev.Path, ev.Operation)
pw.Queue.Remove(hsh)
}
}
}
}
}
42 changes: 29 additions & 13 deletions watcher/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,45 @@ import (
"github.com/cian911/switchboard/event"
)

type Queue struct {
queue map[string]event.Event
// Q holds the Queue
type Q struct {
Queue map[string]event.Event
}

func New() *Queue {
return &Queue{
queue: make(map[string]event.Event),
// NewQueue create a new Q object
func NewQueue() *Q {
return &Q{
Queue: make(map[string]event.Event),
}
}

func (q *Queue) Add(hash string, ev event.Event) {
q.queue[hash] = ev
// Add adds to the queue
func (q *Q) Add(ev event.Event) {
q.Queue[Hash(ev)] = ev
}

func (q *Queue) Retrieve(hash string) event.Event {
return q.queue[hash]
// Retrieve get an item from the queue given a valid hash
func (q *Q) Retrieve(hash string) event.Event {
return q.Queue[hash]
}

func (q *Queue) Remove(hash string) {
delete(q.queue, hash)
// Remove removes an item from the queue
func (q *Q) Remove(hash string) {
delete(q.Queue, hash)
}

func generateHash(ev event.Event) string {
data := []byte(fmt.Sprintf("%s%s%s%s", ev.File, ev.Path, ev.Destination, ev.Ext))
// Size returns the size of the queue
func (q *Q) Size() int {
return len(q.Queue)
}

// Empty returns a bool indicating if the queue is empty or not
func (q *Q) Empty() bool {
return len(q.Queue) == 0
}

// Hash returns a md5 hash composed of an event File, Path, and Ext
func Hash(ev event.Event) string {
data := []byte(fmt.Sprintf("%s%s%s", ev.File, ev.Path, ev.Ext))
return fmt.Sprintf("%x", md5.Sum(data))
}
47 changes: 47 additions & 0 deletions watcher/queue_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,48 @@
package watcher

import (
"testing"
"time"

"github.com/cian911/switchboard/event"
)

func TestQueue(t *testing.T) {
t.Run("It adds one event to the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent()

q.Add(*ev)

if q.Size() != 1 {
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size())
}
})

t.Run("It updates the event in the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent()

q.Add(*ev)
q.Add(*ev)
q.Add(*ev)

if q.Size() != 1 {
// Queue size should not increase
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size())
}
})
}

func setupQueue() *Q {
return NewQueue()
}

func testEvent() *event.Event {
return &event.Event{
File: "sample.txt",
Path: "/var/sample.txt",
Ext: ".txt",
Timestamp: time.Now(),
}
}
30 changes: 25 additions & 5 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"os"
"path/filepath"
"time"

"github.com/cian911/switchboard/event"
"github.com/cian911/switchboard/utils"
Expand All @@ -18,7 +19,7 @@ type Producer interface {
// Unregister a consumer from the producer
Unregister(consumer *Consumer)
// Notify consumers of an event
notify(path, event string)
Notify(path, event string)
// Observe the producer
Observe()
}
Expand All @@ -38,6 +39,8 @@ type Consumer interface {
type PathWatcher struct {
// List of consumers
Consumers []*Consumer
// Queue
Queue *Q
// Watcher instance
Watcher fsnotify.Watcher
// Path to watch
Expand Down Expand Up @@ -67,6 +70,7 @@ func (pc *PathConsumer) Receive(path, ev string) {
Path: path,
Destination: pc.Destination,
Ext: utils.ExtractFileExt(path),
Timestamp: time.Now(),
Operation: ev,
}

Expand All @@ -86,7 +90,7 @@ func (pc *PathConsumer) Receive(path, ev string) {
func (pc *PathConsumer) Process(e *event.Event) {
err := e.Move(e.Path, "")
if err != nil {
log.Fatalf("Unable to move file from { %s } to { %s }: %v", e.Path, e.Destination, err)
log.Printf("Unable to move file from { %s } to { %s }: %v\n", e.Path, e.Destination, err)
} else {
log.Println("Event has been processed.")
}
Expand Down Expand Up @@ -134,6 +138,11 @@ func (pw *PathWatcher) Unregister(consumer *Consumer) {

// Observe the producer
func (pw *PathWatcher) Observe() {
pw.Queue = NewQueue()
go func() {
pw.Poll(3)
}()

watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Could not create new watcher: %v", err)
Expand Down Expand Up @@ -164,9 +173,10 @@ func (pw *PathWatcher) Observe() {
case event := <-watcher.Events:
if event.Op.String() == "CREATE" && utils.IsDir(event.Name) {
watcher.Add(event.Name)
} else if event.Op.String() == "CREATE" || event.Op.String() == "WRITE" {
ev := newEvent(event.Name, event.Op.String())
pw.Queue.Add(*ev)
}

pw.notify(event.Name, event.Op.String())
case err := <-watcher.Errors:
log.Printf("Watcher encountered an error when observing %s: %v", pw.Path, err)
}
Expand All @@ -177,8 +187,18 @@ func (pw *PathWatcher) Observe() {
}

// Notify consumers of an event
func (pw *PathWatcher) notify(path, event string) {
func (pw *PathWatcher) Notify(path, event string) {
for _, cons := range pw.Consumers {
(*cons).Receive(path, event)
}
}

func newEvent(path, ev string) *event.Event {
return &event.Event{
File: filepath.Base(path),
Path: path,
Ext: utils.ExtractFileExt(path),
Timestamp: time.Now(),
Operation: ev,
}
}

0 comments on commit 91554dc

Please sign in to comment.