Skip to content

Commit

Permalink
WATCHER: Add functionality to process directory events containing val…
Browse files Browse the repository at this point in the history
…id files.
  • Loading branch information
Cian911 committed Dec 28, 2021
1 parent de4b56c commit 4c8d669
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 9 deletions.
40 changes: 32 additions & 8 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Consumer interface {
Receive(path, event string)
// Process an event
Process(e *event.Event)
// Process a dir event
ProcessDirEvent(e *event.Event)
}

// PathWatcher is a producer that watches a path for events
Expand Down Expand Up @@ -58,6 +60,7 @@ type PathConsumer struct {
func (pc *PathConsumer) Receive(path, ev string) {
log.Printf("Event Received: %s, Path: %s\n", ev, path)

// TODO: Move IsNewDirEvent to utils and call func on event struct
e := &event.Event{
File: filepath.Base(path),
Path: path,
Expand All @@ -71,27 +74,48 @@ func (pc *PathConsumer) Receive(path, ev string) {

if e.IsValidEvent(pc.Ext) {
log.Println("Event is valid")
pc.Process(e)
} else if e.IsNewDirEvent() {
log.Println("Event is a new dir")

if e.IsNewDirEvent() {
log.Println("Event is a new dir")
// Recursively scan dir for items with our ext
// Then add all recursive dirs as paths
} else {
pc.Process(e)
}
// Recursively scan dir for items with our ext
// Then add all recursive dirs as paths
pc.ProcessDirEvent(e)
}
}

// Process takes an event and moves it to the destination
func (pc *PathConsumer) Process(e *event.Event) {
err := e.Move()
err := e.Move(e.Path)
if err != nil {
log.Fatalf("Unable to move file from { %s } to { %s }: %v", e.Path, e.Destination, err)
} else {
log.Println("Event has been processed.")
}
}

// ProcessDirEvent takes an event and scans files ext
func (pc *PathConsumer) ProcessDirEvent(e *event.Event) {
files, err := utils.ScanFilesInDir(e.Path)
if err != nil {
log.Fatalf("Unable to scan files in dir event: error: %v, path: %s", err, e.Path)
}

// TODO: Copy files to destination nad only move on last file in dir that matches ext

for file, _ := range files {
if utils.ExtractFileExt(file) == pc.Ext {
ev := event.New(file, e.Path, e.Destination, pc.Ext)
log.Printf("EVENT DIR: %v", ev)
err = ev.Move(ev.Path)

if err != nil {
log.Printf("Unable to move file: %s from path: %s to dest: %s: %v", file, ev.Path, ev.Destination, err)
}
}
}
}

// AddPath adds a path to the watcher
func (pw *PathWatcher) AddPath(path string) {
pw.Watcher.Add(path)
Expand Down
82 changes: 81 additions & 1 deletion watcher/watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
package watcher

import "testing"
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/cian911/switchboard/event"
"github.com/cian911/switchboard/utils"
)

const (
path = "/tmp"
destination = "/test"
)

var (
e = &event.Event{
File: "readme.txt",
Path: "/input",
Destination: "/output",
Ext: ".txt",
Operation: "CREATE",
}

ext = ".txt"
file = "sample.txt"
)

func TestWatcher(t *testing.T) {
t.Run("It registers a consumer", func(t *testing.T) {
pw, pc := setup()
Expand All @@ -28,6 +50,37 @@ func TestWatcher(t *testing.T) {
t.Fatalf("Consumer was not unregistered when it should have been. want=%d, got=%d", 0, len(pw.(*PathWatcher).Consumers))
}
})

t.Run("It processes a new dir event", func(t *testing.T) {
pw, pc := setup()

pw.Register(&pc)
pw.Unregister(&pc)

ev := eventSetup(t)
ev.Path = t.TempDir()
ev.File = utils.ExtractFileExt(ev.Path)

for i := 1; i <= 3; i++ {
createTempFile(ev.Path, ".txt", t)
}

pc.Receive(ev.Path, "CREATE")

// Scan dest dir for how many files it contains
// if want == got, all files have been moved successfully
filesInDir, err := utils.ScanFilesInDir(ev.Destination)
if err != nil {
t.Fatalf("Could not scan all files in destination dir: %v", err)
}

want := 3
got := len(filesInDir)

if want != got {
t.Fatalf("want: %d != got: %d", want, got)
}
})
}

func setup() (Producer, Consumer) {
Expand All @@ -42,3 +95,30 @@ func setup() (Producer, Consumer) {

return pw, pc
}

func eventSetup(t *testing.T) *event.Event {
path := t.TempDir()
_, err := ioutil.TempFile(path, file)

if err != nil {
t.Fatalf("Unable to create temp file: %v", err)
}

return &event.Event{
File: file,
Path: path,
Destination: t.TempDir(),
Ext: ext,
Operation: "CREATE",
}
}

func createTempFile(path, ext string, t *testing.T) {
data := []byte("hello\nworld\n")
fileName := fmt.Sprintf("%d.%s", time.Now().Unix(), ext)
err := os.WriteFile(fmt.Sprintf("%s/%s", path, fileName), data, 0644)

if err != nil {
t.Fatalf("Could not create test file: %v", err)
}
}

0 comments on commit 4c8d669

Please sign in to comment.