Skip to content
This repository has been archived by the owner on Jan 25, 2022. It is now read-only.

Commit

Permalink
use tar instead of rsync for streaming in and out
Browse files Browse the repository at this point in the history
this switches the wire protocol to .tar exclusively

Signed-off-by: Max Brunsfeld <mbrunsfeld@pivotallabs.com>
  • Loading branch information
Alex Suraci authored and Max Brunsfeld committed May 5, 2014
1 parent a4e6ea5 commit c1da05b
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 131 deletions.
88 changes: 60 additions & 28 deletions integration/lifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package lifecycle_test

import (
"archive/tar"
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"os"
"os/user"
"path/filepath"
"time"

"github.com/cloudfoundry-incubator/garden/warden"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
archiver "github.com/pivotal-golang/archiver/extractor/test_helper"
)

var _ = Describe("Creating a container", func() {
Expand Down Expand Up @@ -174,25 +178,38 @@ wait
})
})

Context("and copying files in", func() {
var path string
Context("and streaming files in", func() {
var tarStream io.Reader

BeforeEach(func() {
tmpdir, err := ioutil.TempDir("", "some-temp-dir-parent")
Ω(err).ShouldNot(HaveOccurred())

path = filepath.Join(tmpdir, "some-temp-dir")
tgzPath := filepath.Join(tmpdir, "some.tgz")

archiver.CreateTarGZArchive(
tgzPath,
[]archiver.ArchiveFile{
{
Name: "./some-temp-dir",
Dir: true,
},
{
Name: "./some-temp-dir/some-temp-file",
Body: "some-body",
},
},
)

err = os.MkdirAll(path, 0755)
tgz, err := os.Open(tgzPath)
Ω(err).ShouldNot(HaveOccurred())

err = ioutil.WriteFile(filepath.Join(path, "some-temp-file"), []byte("HGJMT<"), 0755)
tarStream, err = gzip.NewReader(tgz)
Ω(err).ShouldNot(HaveOccurred())

})

It("creates the files in the container", func() {
err := container.CopyIn(path, "/tmp/some-container-dir")
err := container.StreamIn(tarStream, "/tmp/some-container-dir")
Ω(err).ShouldNot(HaveOccurred())

_, stream, err := container.Run(warden.ProcessSpec{
Expand All @@ -202,38 +219,53 @@ wait
Expect(*(<-stream).ExitStatus).To(Equal(uint32(42)))
})

Context("with a strailing slash on the destination", func() {
It("does what rsync does (syncs contents)", func() {
err := container.CopyIn(path+"/", "/tmp/some-container-dir/")
Ω(err).ShouldNot(HaveOccurred())

_, stream, err := container.Run(warden.ProcessSpec{
Script: `test -f /tmp/some-container-dir/some-temp-file && exit 42`,
})

Expect(*(<-stream).ExitStatus).To(Equal(uint32(42)))
})
})

Context("and then copying them out", func() {
It("copies the files to the host", func() {
It("streams the directory", func() {
_, stream, err := container.Run(warden.ProcessSpec{
Script: `mkdir -p some-container-dir; touch some-container-dir/some-file;`,
Script: `mkdir -p some-outer-dir/some-inner-dir; touch some-outer-dir/some-inner-dir/some-file;`,
})

Expect(*(<-stream).ExitStatus).To(Equal(uint32(0)))

tmpdir, err := ioutil.TempDir("", "copy-out-temp-dir-parent")
Ω(err).ShouldNot(HaveOccurred())
tarBuffer := new(bytes.Buffer)

user, err := user.Current()
err = container.StreamOut("some-outer-dir/some-inner-dir", tarBuffer)
Ω(err).ShouldNot(HaveOccurred())

err = container.CopyOut("some-container-dir", tmpdir, user.Username)
tarReader := tar.NewReader(tarBuffer)

header, err := tarReader.Next()
Ω(err).ShouldNot(HaveOccurred())
Ω(header.Name).Should(Equal("some-inner-dir/"))

_, err = os.Stat(filepath.Join(tmpdir, "some-container-dir", "some-file"))
header, err = tarReader.Next()
Ω(err).ShouldNot(HaveOccurred())
Ω(header.Name).Should(Equal("some-inner-dir/some-file"))
})

Context("with a trailing slash", func() {
It("streams the contents of the directory", func() {
_, stream, err := container.Run(warden.ProcessSpec{
Script: `mkdir -p some-container-dir; touch some-container-dir/some-file;`,
})

Expect(*(<-stream).ExitStatus).To(Equal(uint32(0)))

tarBuffer := new(bytes.Buffer)

err = container.StreamOut("some-container-dir/", tarBuffer)
Ω(err).ShouldNot(HaveOccurred())

tarReader := tar.NewReader(tarBuffer)

header, err := tarReader.Next()
Ω(err).ShouldNot(HaveOccurred())
Ω(header.Name).Should(Equal("./"))

header, err = tarReader.Next()
Ω(err).ShouldNot(HaveOccurred())
Ω(header.Name).Should(Equal("./some-file"))
})
})
})
})
Expand Down
54 changes: 27 additions & 27 deletions linux_backend/linux_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -429,47 +429,47 @@ func (c *LinuxContainer) CopyOut(src, dst, owner string) error {
func (c *LinuxContainer) StreamIn(src io.Reader, dstPath string) error {
log.Println(c.id, "writing data to: ", dstPath)

tempfile, err := ioutil.TempFile("", "stream-in")
if err != nil {
return err
}

_, err = io.Copy(tempfile, src)
if err != nil {
return err
}

wshPath := path.Join(c.path, "bin", "wsh")
sockPath := path.Join(c.path, "run", "wshd.sock")

wsh := &exec.Cmd{
tar := &exec.Cmd{
Path: wshPath,
Args: []string{"--socket", sockPath, "--user", "vcap", "mkdir", "-p", path.Dir(dstPath)},
}

err = c.runner.Run(wsh)
if err != nil {
return err
Args: []string{
"--socket", sockPath,
"--user", "vcap",
"bash", "-c",
fmt.Sprintf("mkdir -p %s && tar xf - -C %s", dstPath, dstPath),
},
Stdin: src,
}

return c.rsync(tempfile.Name(), "vcap@container:"+dstPath)
return c.runner.Run(tar)
}

func (c *LinuxContainer) StreamOut(srcPath string, dst io.Writer) error {
log.Println(c.id, "reading data from: ", srcPath)

tempfile, err := ioutil.TempFile("", "stream-out")
if err != nil {
return err
wshPath := path.Join(c.path, "bin", "wsh")
sockPath := path.Join(c.path, "run", "wshd.sock")

workingDir := filepath.Dir(srcPath)
compressArg := filepath.Base(srcPath)
if strings.HasSuffix(srcPath, "/") {
workingDir = srcPath
compressArg = "."
}

err = c.rsync("vcap@container:"+srcPath, tempfile.Name())
if err != nil {
return err
tar := &exec.Cmd{
Path: wshPath,
Args: []string{
"--socket", sockPath,
"--user", "vcap",
"tar", "cf", "-", "-C", workingDir, compressArg,
},
Stdout: dst,
}

_, err = io.Copy(dst, tempfile)
return err
return c.runner.Run(tar)
}

func (c *LinuxContainer) LimitBandwidth(limits warden.BandwidthLimits) error {
Expand Down

0 comments on commit c1da05b

Please sign in to comment.