Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bidirectional sync #640

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.env
.git
cmd/changes/changes
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.env
cmd/changes/changes
tags
308 changes: 295 additions & 13 deletions api/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand All @@ -24,6 +25,7 @@ import (
"syscall"
"time"

"github.com/convox/rack/cmd/convox/changes"
"github.com/convox/rack/cmd/convox/templates"
"github.com/fatih/color"
"github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -546,7 +548,7 @@ func (m *Manifest) Raw() ([]byte, error) {
return yaml.Marshal(m)
}

func (m *Manifest) Run(app string, cache bool, shift int) []error {
func (m *Manifest) Run(app string, cache, sync bool, shift int) []error {
ch := make(chan error)
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt, os.Kill)
Expand All @@ -567,6 +569,15 @@ func (m *Manifest) Run(app string, cache bool, shift int) []error {
sch := make(chan error)
go (*m)[name].runAsync(m, m.prefixForEntry(name, i), app, name, cache, shift, ch, sch)
<-sch // block until started successfully
if sync {
time.Sleep(1 * time.Second)
m.sync(app, name)
}
}

if sync {
go m.syncFiles()
go m.syncBack()
}

errors := []error{}
Expand All @@ -583,16 +594,52 @@ func (m *Manifest) Run(app string, cache bool, shift int) []error {
return errors
}

func (m *Manifest) Sync(app string) error {
for _, name := range m.runOrder() {
err := (*m)[name].syncAdds(app, name)
func (m *Manifest) sync(app, process string) error {
err := (*m)[process].syncAdds(app, process)

if err != nil {
return err
}
if err != nil {
return err
}

return nil
}

func (me ManifestEntry) copyChangesBinaryToContainer(app, process string) error {
// only sync containers with a build directive
if me.Build == "" {
return nil
}

go m.syncFiles()
dc, _ := docker.NewClientFromEnv()

var buf bytes.Buffer

tgz := tar.NewWriter(&buf)

data, err := changes.Asset("../changes/changes")

if err != nil {
return err
}

tgz.WriteHeader(&tar.Header{
Name: "changes",
Mode: 0755,
Size: int64(len(data)),
})

tgz.Write(data)

tgz.Close()

err = dc.UploadToContainer(containerName(app, process), docker.UploadToContainerOptions{
InputStream: &buf,
Path: "/",
})

if err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -894,6 +941,12 @@ func (me ManifestEntry) syncAdds(app, process string) error {
return nil
}

err := me.copyChangesBinaryToContainer(app, process)

if err != nil {
return err
}

dockerfile := filepath.Join(me.Build, "Dockerfile")

if me.Dockerfile != "" {
Expand Down Expand Up @@ -1160,10 +1213,19 @@ func watchWalker(files Files, local string, adds map[string]bool, lock sync.Mute
}

if files[path] != info.ModTime() {
lock.Lock()
adds[path] = true
files[path] = info.ModTime()
lock.Unlock()

blockLocalAddsLock.Lock()

if blockLocalAdds[path] > 0 {
blockLocalAdds[path] -= 1
} else {
lock.Lock()
adds[path] = true
lock.Unlock()
}

blockLocalAddsLock.Unlock()
}

return nil
Expand All @@ -1182,7 +1244,7 @@ func processAdds(prefix string, adds map[string]bool, lock sync.Mutex, syncs []S

lock.Lock()

fmt.Printf(system("%s syncing %d files\n"), prefix, len(adds))
fmt.Printf(system("%s uploading %d files\n"), prefix, len(adds))

for _, sync := range syncs {
var buf bytes.Buffer
Expand All @@ -1204,6 +1266,10 @@ func processAdds(prefix string, adds map[string]bool, lock sync.Mutex, syncs []S

remote := filepath.Join(sync.Remote, rel)

blockRemoteAddsLock.Lock()
blockRemoteAdds[remote] += 1
blockRemoteAddsLock.Unlock()

tgz.WriteHeader(&tar.Header{
Name: remote,
Mode: 0644,
Expand Down Expand Up @@ -1256,7 +1322,7 @@ func processRemoves(prefix string, removes map[string]bool, lock sync.Mutex, syn

lock.Lock()

fmt.Printf("%s removing %d files\n", prefix, len(removes))
fmt.Printf(system(fmt.Sprintf("%s removing %d remote files\n", prefix, len(removes))))

for file := range removes {
cmd = append(cmd, file)
Expand Down Expand Up @@ -1327,6 +1393,222 @@ func registerSync(container, local, remote string) error {
return nil
}

func (m *Manifest) syncBack() error {
containers := map[string][]string{}

// Group sync directories by container
for _, sync := range syncs {
containers[sync.Container] = append(containers[sync.Container], sync.Remote)
}

dc, _ := docker.NewClientFromEnv()

for container, dirs := range containers {
exec, err := dc.CreateExec(docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Cmd: append([]string{"/changes"}, dirs...),
Container: container,
})

if err != nil {
fmt.Printf("err = %+v\n", err)
return err
}

r, w := io.Pipe()

go m.scanRemoteChanges(container, r)
go m.processRemoteChanges(container)

err = dc.StartExec(exec.ID, docker.StartExecOptions{
Tty: true,
RawTerminal: true,
OutputStream: w,
ErrorStream: w,
})

fmt.Println("terminated")

if err != nil {
fmt.Printf("err = %+v\n", err)
return err
}

}

return nil
}

var (
remoteAdds = map[string]string{}
remoteRemoves = map[string]string{}
remoteAddLock sync.Mutex
remoteRemoveLock sync.Mutex
blockLocalAdds = map[string]int{}
blockLocalAddsLock sync.Mutex
blockRemoteAdds = map[string]int{}
blockRemoteAddsLock sync.Mutex
)

func (m *Manifest) scanRemoteChanges(container string, r io.Reader) {
scanner := bufio.NewScanner(r)

for scanner.Scan() {
parts := strings.Split(scanner.Text(), "|")

if len(parts) == 3 {
for _, sync := range syncs {
local := filepath.Join(sync.Local, parts[2])
remote := filepath.Join(parts[1], parts[2])

if sync.Remote == parts[1] {
switch parts[0] {
case "add":
blockRemoteAddsLock.Lock()

if blockRemoteAdds[remote] > 0 {
blockRemoteAdds[remote] -= 1
} else {
remoteAddLock.Lock()
remoteAdds[local] = remote
remoteAddLock.Unlock()
}

blockRemoteAddsLock.Unlock()
case "delete":
remoteRemoveLock.Lock()
remoteRemoves[filepath.Join(sync.Local, parts[2])] = filepath.Join(parts[1], parts[2])
remoteRemoveLock.Unlock()
}
}
}
}
}

if err := scanner.Err(); err != nil {
fmt.Println("scanner error")
}
}

func (m *Manifest) processRemoteChanges(container string) {
for {
remoteAddLock.Lock()

num := 0
adds := map[string]string{}

for local, remote := range remoteAdds {
adds[local] = remote
delete(remoteAdds, local)
num += 1

if num >= 1000 {
break
}
}

remoteAddLock.Unlock()

go m.downloadRemoteAdds(container, adds)

remoteRemoveLock.Lock()

if len(remoteRemoves) > 0 {
fmt.Printf(system("%s removing %d local files\n"), m.systemPrefix(), len(remoteRemoves))

for local, _ := range remoteRemoves {
os.Remove(local)
delete(remoteRemoves, local)
}
}

remoteRemoveLock.Unlock()

time.Sleep(1 * time.Second)
}
}

func (m *Manifest) downloadRemoteAdds(container string, adds map[string]string) {
if len(adds) == 0 {
return
}

remotes := []string{}
locals := map[string]string{}

for local, remote := range adds {
remotes = append(remotes, remote)
locals[remote] = local
}

fmt.Printf(system("%s downloading %d files\n"), m.systemPrefix(), len(remotes))

args := append([]string{"exec", "-i", container, "tar", "czf", "-"}, remotes...)

data, err := Execer("docker", args...).Output()

if err != nil {
fmt.Printf("err = %+v\n", err)
return
}

gz, err := gzip.NewReader(bytes.NewBuffer(data))

if err != nil {
fmt.Printf("err = %+v\n", err)
return
}

tr := tar.NewReader(gz)

for {
header, err := tr.Next()

if err == io.EOF {
break
}

if err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

switch header.Typeflag {
case tar.TypeReg:
if local, ok := locals[filepath.Join("/", header.Name)]; ok {
if err := os.MkdirAll(filepath.Dir(local), 0755); err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

if _, err := os.Stat(local); !os.IsNotExist(err) {
if err := os.Remove(local); err != nil {
fmt.Printf("err = %+v\n", err)
continue
}
}

blockLocalAddsLock.Lock()
blockLocalAdds[local] += 1
blockLocalAddsLock.Unlock()

fd, err := os.OpenFile(local, os.O_CREATE, os.FileMode(header.Mode))

if err != nil {
fmt.Printf("err = %+v\n", err)
continue
}

io.Copy(fd, tr)

fd.Close()
}
}
}
}

func (m *Manifest) syncFiles() error {
watches := map[string][]Sync{}
candidates := []string{}
Expand Down
Loading