Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/1.0] archive, cio, cmd, linux: use buffer pools #2051

Merged
merged 1 commit into from Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 4 additions & 6 deletions archive/tar.go
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pkg/errors"
)

var bufferPool = &sync.Pool{
var bufPool = &sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32*1024)
return &buffer
Expand Down Expand Up @@ -423,9 +423,7 @@ func (cw *changeWriter) HandleChange(k fs.ChangeKind, p string, f os.FileInfo, e
}
defer file.Close()

buf := bufferPool.Get().(*[]byte)
n, err := io.CopyBuffer(cw.tw, file, *buf)
bufferPool.Put(buf)
n, err := copyBuffered(context.TODO(), cw.tw, file)
if err != nil {
return errors.Wrap(err, "failed to copy")
}
Expand Down Expand Up @@ -578,8 +576,8 @@ func (cw *changeWriter) includeParents(hdr *tar.Header) error {
}

func copyBuffered(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error) {
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)

for {
select {
Expand Down
9 changes: 8 additions & 1 deletion cio/io.go
Expand Up @@ -8,7 +8,14 @@ import (
"sync"
)

// Config holds the io configurations.
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}

// Config holds the IO configurations.
type Config struct {
// Terminal is true if one has been allocated
Terminal bool
Expand Down
13 changes: 10 additions & 3 deletions cio/io_unix.go
Expand Up @@ -53,7 +53,9 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
}
set = append(set, f)
go func(w io.WriteCloser) {
io.Copy(w, ioset.in)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(w, ioset.in, *p)
w.Close()
}(f)

Expand All @@ -63,7 +65,9 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.out, r)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(ioset.out, r, *p)
r.Close()
wg.Done()
}(f)
Expand All @@ -76,7 +80,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
if !tty {
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.err, r)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(ioset.err, r, *p)
r.Close()
wg.Done()
}(f)
Expand Down
16 changes: 13 additions & 3 deletions cio/io_windows.go
Expand Up @@ -46,7 +46,11 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In)
return
}
io.Copy(c, ioset.in)

p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(c, ioset.in, *p)
c.Close()
l.Close()
}()
Expand All @@ -72,7 +76,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out)
return
}
io.Copy(ioset.out, c)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(ioset.out, c, *p)
c.Close()
l.Close()
}()
Expand All @@ -98,7 +105,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err)
return
}
io.Copy(ioset.err, c)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(ioset.err, c, *p)
c.Close()
l.Close()
}()
Expand Down
6 changes: 2 additions & 4 deletions cmd/containerd/main.go
Expand Up @@ -10,17 +10,15 @@ import (
"os/signal"
"time"

"google.golang.org/grpc/grpclog"

gocontext "golang.org/x/net/context"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/server"
"github.com/containerd/containerd/sys"
"github.com/containerd/containerd/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
gocontext "golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)

const usage = `
Expand Down
5 changes: 2 additions & 3 deletions cmd/containerd/main_unix.go
Expand Up @@ -7,11 +7,10 @@ import (
"os"
"runtime"

"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/server"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

const defaultConfigPath = "/etc/containerd/config.toml"
Expand Down
11 changes: 10 additions & 1 deletion cmd/ctr/commands/shim/io_unix.go
Expand Up @@ -12,6 +12,13 @@ import (
"golang.org/x/sys/unix"
)

var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}

func prepareStdio(stdin, stdout, stderr string, console bool) (wg *sync.WaitGroup, err error) {
wg = &sync.WaitGroup{}
ctx := gocontext.Background()
Expand All @@ -26,7 +33,9 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (wg *sync.WaitGrou
}
}(f)
go func(w io.WriteCloser) {
io.Copy(w, os.Stdin)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(w, os.Stdin, *p)
w.Close()
}(f)

Expand Down
10 changes: 2 additions & 8 deletions linux/bundle.go
Expand Up @@ -3,9 +3,8 @@
package linux

import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"path/filepath"

Expand Down Expand Up @@ -52,12 +51,7 @@ func newBundle(id, path, workDir string, spec []byte) (b *bundle, err error) {
if err := os.Mkdir(filepath.Join(path, "rootfs"), 0711); err != nil {
return nil, err
}
f, err := os.Create(filepath.Join(path, configFilename))
if err != nil {
return nil, err
}
defer f.Close()
_, err = io.Copy(f, bytes.NewReader(spec))
err = ioutil.WriteFile(filepath.Join(path, configFilename), spec, 0666)
return &bundle{
id: id,
path: path,
Expand Down
21 changes: 18 additions & 3 deletions linux/proc/io.go
Expand Up @@ -13,14 +13,23 @@ import (
runc "github.com/containerd/go-runc"
)

var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}

func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){
stdout: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(wc, rio.Stdout())
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(wc, rio.Stdout(), *p)
wg.Done()
wc.Close()
rc.Close()
Expand All @@ -31,7 +40,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(wc, rio.Stderr())
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(wc, rio.Stderr(), *p)
wg.Done()
wc.Close()
rc.Close()
Expand Down Expand Up @@ -59,7 +71,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(rio.Stdin(), f)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(rio.Stdin(), f, *p)
rio.Stdin().Close()
f.Close()
}()
Expand Down
5 changes: 4 additions & 1 deletion linux/proc/utils.go
Expand Up @@ -66,7 +66,10 @@ func copyFile(to, from string) error {
return err
}
defer tt.Close()
_, err = io.Copy(tt, ff)

p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
_, err = io.CopyBuffer(tt, ff, *p)
return err
}

Expand Down
10 changes: 9 additions & 1 deletion linux/shim/service.go
Expand Up @@ -29,7 +29,15 @@ import (
"google.golang.org/grpc/status"
)

var empty = &ptypes.Empty{}
var (
empty = &ptypes.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
)

// Config contains shim specific configuration
type Config struct {
Expand Down
8 changes: 6 additions & 2 deletions linux/shim/service_linux.go
Expand Up @@ -33,7 +33,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(epollConsole, in)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
}()
}

Expand All @@ -49,7 +51,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outw, epollConsole)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
epollConsole.Close()
outr.Close()
outw.Close()
Expand Down
10 changes: 8 additions & 2 deletions linux/shim/service_unix.go
Expand Up @@ -24,7 +24,10 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console,
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(console, in)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(console, in, *p)
}()
}
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
Expand All @@ -39,7 +42,10 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console,
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outw, console)
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)

io.CopyBuffer(outw, console, *p)
console.Close()
outr.Close()
outw.Close()
Expand Down