/
main.go
125 lines (106 loc) · 2.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"
"github.com/BurntSushi/toml"
sfuse "github.com/dimitarvdimitrov/sporkfs/fuse"
"github.com/dimitarvdimitrov/sporkfs/log"
"github.com/dimitarvdimitrov/sporkfs/spork"
"github.com/dimitarvdimitrov/sporkfs/store"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"go.uber.org/zap"
)
func main() {
defer log.Sync()
flag.Parse()
cfgLocation := flag.Arg(0)
cfg := parseConfig(cfgLocation)
if err := os.MkdirAll(cfg.DataDir, 0777); err != nil {
log.Fatal("couldn't create backing storage directory", zap.Error(err))
}
ctx, cancel := context.WithCancel(context.Background())
invFiles := make(chan *store.File)
deletedFiles := make(chan *store.File)
sporkService, err := spork.New(ctx, cancel, cfg, invFiles, deletedFiles)
if err != nil {
log.Fatal("init", zap.Error(err))
}
vfs := sfuse.NewFS(&sporkService, invFiles, deletedFiles)
wg := &sync.WaitGroup{}
startFuseServer(ctx, cancel, cfg.MountPoint, vfs, wg)
handleOsSignals(ctx, cancel)
unmountWhenDone(ctx, cfg.MountPoint, wg)
<-ctx.Done()
log.Info("shutting down...")
vfs.Destroy()
wg.Wait()
log.Info("bye-bye")
}
func handleOsSignals(ctx context.Context, cancel context.CancelFunc) {
go func() {
signals := make(chan os.Signal)
signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM)
select {
case <-signals:
case <-ctx.Done():
}
cancel()
}()
}
func unmountWhenDone(ctx context.Context, mountpoint string, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
<-ctx.Done()
for done := false; !done; time.Sleep(time.Second) {
if err := exec.Command("fusermount", "-zu", mountpoint).Run(); err != nil {
log.Error("unmount", zap.Error(err))
} else {
done = true
}
}
log.Info("unmounted vfs")
wg.Done()
}()
}
func startFuseServer(ctx context.Context, cancel context.CancelFunc, mountpoint string, vfs sfuse.Fs, wg *sync.WaitGroup) {
log.Info(fmt.Sprintf("mounting sporkfs at %s...", mountpoint))
fuseConn, err := fuse.Mount(mountpoint,
fuse.FSName("sporkfs"),
fuse.VolumeName("sporkfs"),
)
if err != nil {
log.Fatal("couldn't mount", zap.Error(err))
}
log.Info("mount successful")
fuseServer := fs.New(fuseConn, &fs.Config{
Debug: func(m interface{}) { log.Debug(fmt.Sprint(m)) },
})
wg.Add(1)
go func() {
log.Info("sporkfs started")
if err := fuseServer.Serve(vfs); err != nil {
log.Error("serve", zap.Error(err))
}
log.Info("stopped fuse")
wg.Done()
cancel()
}()
go vfs.WatchInvalidations(fuseServer)
go vfs.WatchDeletions()
}
func parseConfig(dir string) (cfg spork.Config) {
_, err := toml.DecodeFile(dir, &cfg)
if err != nil {
log.Fatal("decoding config", zap.Error(err))
}
cfg.Config.DataDir = cfg.DataDir
return
}