Skip to content

Commit

Permalink
Provide callback progress hooks for long copies.
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Hollensbe <github@hollensbe.org>
  • Loading branch information
Erik Hollensbe committed Jan 20, 2017
1 parent d97b405 commit 64293b8
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 5 deletions.
21 changes: 18 additions & 3 deletions docker/daemon/daemon_dest.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containers/image/docker/reference"
"github.com/containers/image/manifest"
"github.com/containers/image/streamcopy"
"github.com/containers/image/types"
"github.com/docker/engine-api/client"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -53,7 +54,7 @@ func newImageDestination(systemCtx *types.SystemContext, ref daemonReference) (t
statusChannel := make(chan error, 1)

ctx, goroutineCancel := context.WithCancel(context.Background())
go imageLoadGoroutine(ctx, c, reader, statusChannel)
go imageLoadGoroutine(ctx, systemCtx, c, reader, statusChannel)

return &daemonImageDestination{
ref: ref,
Expand All @@ -68,25 +69,38 @@ func newImageDestination(systemCtx *types.SystemContext, ref daemonReference) (t
}

// imageLoadGoroutine accepts tar stream on reader, sends it to c, and reports error or success by writing to statusChannel
func imageLoadGoroutine(ctx context.Context, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
func imageLoadGoroutine(ctx context.Context, sysctx *types.SystemContext, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
err := errors.New("Internal error: unexpected panic in imageLoadGoroutine")
ourReader := reader

defer func() {
logrus.Debugf("docker-daemon: sending done, status %v", err)
statusChannel <- err
}()
defer func() {
if err == nil {
reader.Close()
ourReader.Close()
} else {
reader.CloseWithError(err)
ourReader.CloseWithError(err)
}
}()

resp, err := c.ImageLoad(ctx, reader, true)
if sysctx.CopyHook != nil && sysctx.CopyHookInterval > 0 {
var w *io.PipeWriter
ourReader, w = io.Pipe()
go func() {
w.CloseWithError(streamcopy.WithProgress(w, reader, sysctx.CopyHookInterval, sysctx.CopyHook))
}()
}

resp, err := c.ImageLoad(ctx, ourReader, true)
if err != nil {
err = errors.Wrap(err, "Error saving image to docker engine")
return
}

defer resp.Body.Close()
}

Expand Down Expand Up @@ -275,6 +289,7 @@ func (d *daemonImageDestination) sendFile(path string, expectedSize int64, strea
if err := d.tar.WriteHeader(hdr); err != nil {
return err
}

size, err := io.Copy(d.tar, stream)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions docker/daemon/daemon_src.go
Expand Up @@ -10,6 +10,7 @@ import (
"path"

"github.com/containers/image/manifest"
"github.com/containers/image/streamcopy"
"github.com/containers/image/types"
"github.com/docker/engine-api/client"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -73,8 +74,14 @@ func newImageSource(ctx *types.SystemContext, ref daemonReference) (types.ImageS
}
}()

if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
return nil, err
if ctx == nil {
if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
return nil, err
}
} else {
if err := streamcopy.WithProgress(tarCopyFile, inputStream, ctx.CopyHookInterval, ctx.CopyHook); err != nil {
return nil, err
}
}

succeeded = true
Expand Down
53 changes: 53 additions & 0 deletions streamcopy/copy.go
@@ -0,0 +1,53 @@
package streamcopy

import (
"bufio"
"io"
"time"
)

const megaByte = float64(1024 * 1024)
const readerSize = 32768

// WithProgress implements io.Copy with a buffered reader, then measures
// reports to the offsetFunc with the count processed at the interval.
// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
func WithProgress(writer io.Writer, reader io.Reader, interval time.Duration, offsetFunc func(uint64)) error {
rd := bufio.NewReaderSize(reader, readerSize)

count := uint64(0)
buf := make([]byte, readerSize)
t := time.Now()
for {
rn, err := rd.Read(buf)
count += uint64(rn)

if err == io.EOF {
if rn > 0 {
goto write
} else {
return nil
}
}
if err != nil {
return err
}

if interval > 0 && offsetFunc != nil {
if time.Since(t) > interval {
offsetFunc(count)
t = time.Now()
}
}

write:
_, werr := writer.Write(buf[:rn])
if werr != nil {
return werr
}

if err == io.EOF {
return nil
}
}
}
3 changes: 3 additions & 0 deletions types/types.go
Expand Up @@ -293,6 +293,9 @@ type SystemContext struct {
// Note that this field is used mainly to integrate containers/image into projectatomic/docker
// in order to not break any existing docker's integration tests.
DockerDisableV1Ping bool

CopyHookInterval time.Duration // time to wait between reports to the CopyHook
CopyHook func(offset uint64) // Reported to when CopyReportInterval has expired during a copy operation
}

var (
Expand Down

0 comments on commit 64293b8

Please sign in to comment.