Skip to content

Commit

Permalink
bidirectional sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ddollar committed May 24, 2016
1 parent 3b6e9cc commit 8ce8ffd
Showing 1 changed file with 146 additions and 31 deletions.
177 changes: 146 additions & 31 deletions api/manifest/manifest.go
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -1212,10 +1213,19 @@ func watchWalker(files Files, local string, adds map[string]bool, lock sync.Mute
}

if files[path] != info.ModTime() {
lock.Lock()
adds[path] = true
files[path] = info.ModTime()
lock.Unlock()

blockLocalAddsLock.Lock()

if blockLocalAdds[path] > 0 {
blockLocalAdds[path] -= 1
} else {
lock.Lock()
adds[path] = true
lock.Unlock()
}

blockLocalAddsLock.Unlock()
}

return nil
Expand All @@ -1234,7 +1244,7 @@ func processAdds(prefix string, adds map[string]bool, lock sync.Mutex, syncs []S

lock.Lock()

fmt.Printf(system("%s syncing %d files\n"), prefix, len(adds))
fmt.Printf(system("%s uploading %d files\n"), prefix, len(adds))

for _, sync := range syncs {
var buf bytes.Buffer
Expand All @@ -1256,6 +1266,10 @@ func processAdds(prefix string, adds map[string]bool, lock sync.Mutex, syncs []S

remote := filepath.Join(sync.Remote, rel)

blockRemoteAddsLock.Lock()
blockRemoteAdds[remote] += 1
blockRemoteAddsLock.Unlock()

tgz.WriteHeader(&tar.Header{
Name: remote,
Mode: 0644,
Expand Down Expand Up @@ -1308,7 +1322,7 @@ func processRemoves(prefix string, removes map[string]bool, lock sync.Mutex, syn

lock.Lock()

fmt.Printf("%s removing %d files\n", prefix, len(removes))
fmt.Printf(system(fmt.Sprintf("%s removing %d remote files\n", prefix, len(removes))))

for file := range removes {
cmd = append(cmd, file)
Expand Down Expand Up @@ -1387,13 +1401,9 @@ func (m *Manifest) syncBack() error {
containers[sync.Container] = append(containers[sync.Container], sync.Remote)
}

fmt.Printf("containers = %+v\n", containers)

dc, _ := docker.NewClientFromEnv()

for container, dirs := range containers {
fmt.Printf("container = %+v\n", container)

exec, err := dc.CreateExec(docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
Expand Down Expand Up @@ -1432,10 +1442,14 @@ func (m *Manifest) syncBack() error {
}

var (
remoteAdds = map[string]string{}
remoteRemoves = map[string]string{}
remoteAddLock sync.Mutex
remoteRemoveLock sync.Mutex
remoteAdds = map[string]string{}
remoteRemoves = map[string]string{}
remoteAddLock sync.Mutex
remoteRemoveLock sync.Mutex
blockLocalAdds = map[string]int{}
blockLocalAddsLock sync.Mutex
blockRemoteAdds = map[string]int{}
blockRemoteAddsLock sync.Mutex
)

func (m *Manifest) scanRemoteChanges(container string, r io.Reader) {
Expand All @@ -1445,22 +1459,30 @@ func (m *Manifest) scanRemoteChanges(container string, r io.Reader) {
parts := strings.Split(scanner.Text(), "|")

if len(parts) == 3 {
fmt.Printf("syncs = %+v\n", syncs)

switch parts[0] {
case "add":
for _, sync := range syncs {
if sync.Remote == parts[1] {
remoteAddLock.Lock()
remoteAdds[filepath.Join(sync.Local, parts[2])] = filepath.Join(parts[1], parts[2])
remoteAddLock.Unlock()
for _, sync := range syncs {
local := filepath.Join(sync.Local, parts[2])
remote := filepath.Join(parts[1], parts[2])

if sync.Remote == parts[1] {
switch parts[0] {
case "add":
blockRemoteAddsLock.Lock()

if blockRemoteAdds[remote] > 0 {
blockRemoteAdds[remote] -= 1
} else {
remoteAddLock.Lock()
remoteAdds[local] = remote
remoteAddLock.Unlock()
}

blockRemoteAddsLock.Unlock()
case "delete":
remoteRemoveLock.Lock()
remoteRemoves[filepath.Join(sync.Local, parts[2])] = filepath.Join(parts[1], parts[2])
remoteRemoveLock.Unlock()
}
}
case "remove":
remoteRemoveLock.Lock()
file := "foo"
remoteRemoves[file] = ""
remoteRemoveLock.Unlock()
}
}
}
Expand All @@ -1474,18 +1496,32 @@ func (m *Manifest) processRemoteChanges(container string) {
for {
remoteAddLock.Lock()

num := 0
adds := map[string]string{}

for local, remote := range remoteAdds {
fmt.Printf("add %s %s\n", local, remote)
adds[local] = remote
delete(remoteAdds, local)
num += 1

if num >= 1000 {
break
}
}

remoteAddLock.Unlock()

go m.downloadRemoteAdds(container, adds)

remoteRemoveLock.Lock()

for local, remote := range remoteRemoves {
fmt.Printf("remove %s %s\n", local, remote)
delete(remoteRemoves, local)
if len(remoteRemoves) > 0 {
fmt.Printf(system("%s removing %d local files\n"), m.systemPrefix(), len(remoteRemoves))

for local, _ := range remoteRemoves {
os.Remove(local)
delete(remoteRemoves, local)
}
}

remoteRemoveLock.Unlock()
Expand All @@ -1494,6 +1530,85 @@ func (m *Manifest) processRemoteChanges(container string) {
}
}

func (m *Manifest) downloadRemoteAdds(container string, adds map[string]string) {
if len(adds) == 0 {
return
}

remotes := []string{}
locals := map[string]string{}

for local, remote := range adds {
remotes = append(remotes, remote)
locals[remote] = local
}

fmt.Printf(system("%s downloading %d files\n"), m.systemPrefix(), len(remotes))

args := append([]string{"exec", "-i", container, "tar", "czf", "-"}, remotes...)

data, err := Execer("docker", args...).Output()

if err != nil {
fmt.Printf("err = %+v\n", err)
return
}

gz, err := gzip.NewReader(bytes.NewBuffer(data))

if err != nil {
fmt.Printf("err = %+v\n", err)
return
}

tr := tar.NewReader(gz)

for {
header, err := tr.Next()

if err == io.EOF {
break
}

if err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

switch header.Typeflag {
case tar.TypeReg:
if local, ok := locals[filepath.Join("/", header.Name)]; ok {
if err := os.MkdirAll(filepath.Dir(local), 0755); err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

if _, err := os.Stat(local); !os.IsNotExist(err) {
if err := os.Remove(local); err != nil {
fmt.Printf("err = %+v\n", err)
continue
}
}

blockLocalAddsLock.Lock()
blockLocalAdds[local] += 1
blockLocalAddsLock.Unlock()

fd, err := os.OpenFile(local, os.O_CREATE, os.FileMode(header.Mode))

if err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

io.Copy(fd, tr)

fd.Close()
}
}
}
}

func (m *Manifest) syncFiles() error {
watches := map[string][]Sync{}
candidates := []string{}
Expand Down

0 comments on commit 8ce8ffd

Please sign in to comment.