Skip to content

Commit

Permalink
Provide callback progress hooks for long copies.
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Hollensbe <github@hollensbe.org>
  • Loading branch information
Erik Hollensbe committed Feb 6, 2017
1 parent 1c202c5 commit 8f93af5
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 5 deletions.
19 changes: 19 additions & 0 deletions copy/copy.go
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"reflect"
"time"

pb "gopkg.in/cheggaaa/pb.v1"

Expand Down Expand Up @@ -92,6 +93,8 @@ type Options struct {
ReportWriter io.Writer
SourceCtx *types.SystemContext
DestinationCtx *types.SystemContext
SignalInterval time.Duration // time to wait between reports to the Signal
Signal chan types.SignalProperties // Reported to when SignalInterval has arrived
}

// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
Expand All @@ -104,6 +107,22 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
fmt.Fprintf(reportWriter, f, a...)
}

if options.SignalInterval > 0 && options.Signal != nil {
// force a context if there isn't one
if options.SourceCtx == nil {
options.SourceCtx = &types.SystemContext{}
}

if options.DestinationCtx == nil {
options.DestinationCtx = &types.SystemContext{}
}

for _, ctx := range []*types.SystemContext{options.SourceCtx, options.DestinationCtx} {
ctx.SignalInterval = options.SignalInterval
ctx.Signal = options.Signal
}
}

dest, err := destRef.NewImageDestination(options.DestinationCtx)
if err != nil {
return errors.Wrapf(err, "Error initializing destination %s", transports.ImageName(destRef))
Expand Down
23 changes: 20 additions & 3 deletions docker/daemon/daemon_dest.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containers/image/docker/reference"
"github.com/containers/image/manifest"
"github.com/containers/image/streamcopy"
"github.com/containers/image/types"
"github.com/docker/docker/client"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -53,7 +54,7 @@ func newImageDestination(systemCtx *types.SystemContext, ref daemonReference) (t
statusChannel := make(chan error, 1)

ctx, goroutineCancel := context.WithCancel(context.Background())
go imageLoadGoroutine(ctx, c, reader, statusChannel)
go imageLoadGoroutine(ctx, systemCtx, c, reader, statusChannel, ref.ref.String())

return &daemonImageDestination{
ref: ref,
Expand All @@ -68,25 +69,40 @@ func newImageDestination(systemCtx *types.SystemContext, ref daemonReference) (t
}

// imageLoadGoroutine accepts tar stream on reader, sends it to c, and reports error or success by writing to statusChannel
func imageLoadGoroutine(ctx context.Context, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
func imageLoadGoroutine(ctx context.Context, sysctx *types.SystemContext, c *client.Client, reader *io.PipeReader, statusChannel chan<- error, ref string) {
err := errors.New("Internal error: unexpected panic in imageLoadGoroutine")
ourReader := reader

defer func() {
logrus.Debugf("docker-daemon: sending done, status %v", err)
statusChannel <- err
}()
defer func() {
if err == nil {
reader.Close()
ourReader.Close()
} else {
reader.CloseWithError(err)
ourReader.CloseWithError(err)
}
}()

resp, err := c.ImageLoad(ctx, reader, true)
if sysctx != nil {
if sysctx.Signal != nil && sysctx.SignalInterval > 0 {
var w *io.PipeWriter
ourReader, w = io.Pipe()
go func() {
w.CloseWithError(streamcopy.WithProgress(w, reader, ref, sysctx.SignalInterval, sysctx.Signal))
}()
}
}

resp, err := c.ImageLoad(ctx, ourReader, true)
if err != nil {
err = errors.Wrap(err, "Error saving image to docker engine")
return
}

defer resp.Body.Close()
}

Expand Down Expand Up @@ -282,6 +298,7 @@ func (d *daemonImageDestination) sendFile(path string, expectedSize int64, strea
if err := d.tar.WriteHeader(hdr); err != nil {
return err
}

size, err := io.Copy(d.tar, stream)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions docker/daemon/daemon_src.go
Expand Up @@ -10,6 +10,7 @@ import (
"path"

"github.com/containers/image/manifest"
"github.com/containers/image/streamcopy"
"github.com/containers/image/types"
"github.com/docker/docker/client"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -73,8 +74,14 @@ func newImageSource(ctx *types.SystemContext, ref daemonReference) (types.ImageS
}
}()

if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
return nil, err
if ctx == nil {
if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
return nil, err
}
} else {
if err := streamcopy.WithProgress(tarCopyFile, inputStream, ref.ref.String(), ctx.SignalInterval, ctx.Signal); err != nil {
return nil, err
}
}

succeeded = true
Expand Down
55 changes: 55 additions & 0 deletions streamcopy/copy.go
@@ -0,0 +1,55 @@
package streamcopy

import (
"bufio"
"io"
"time"

"github.com/containers/image/types"
)

const megaByte = float64(1024 * 1024)
const readerSize = 32768

// WithProgress implements io.Copy with a buffered reader, then measures
// reports to the offsetFunc with the count processed at the interval.
// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
func WithProgress(writer io.Writer, reader io.Reader, artifact string, interval time.Duration, signal chan types.SignalProperties) error {
rd := bufio.NewReaderSize(reader, readerSize)

count := uint64(0)
buf := make([]byte, readerSize)
t := time.Now()
for {
rn, err := rd.Read(buf)
count += uint64(rn)

if err == io.EOF {
if rn > 0 {
goto write
} else {
return nil
}
}
if err != nil {
return err
}

if interval > 0 && signal != nil {
if time.Since(t) > interval {
signal <- types.SignalProperties{Delta: count, Artifact: artifact}
t = time.Now()
}
}

write:
_, werr := writer.Write(buf[:rn])
if werr != nil {
return werr
}

if err == io.EOF {
return nil
}
}
}
10 changes: 10 additions & 0 deletions types/types.go
Expand Up @@ -293,6 +293,16 @@ type SystemContext struct {
// Note that this field is used mainly to integrate containers/image into projectatomic/docker
// in order to not break any existing docker's integration tests.
DockerDisableV1Ping bool

SignalInterval time.Duration // time to wait between reports to the Signal
Signal chan SignalProperties // Reported to when SignalInterval has arrived
}

// SignalProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type SignalProperties struct {
Artifact string
Delta uint64
}

var (
Expand Down

0 comments on commit 8f93af5

Please sign in to comment.