Skip to content

Commit

Permalink
Provide callback progress hooks for long copies.
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Hollensbe <github@hollensbe.org>
  • Loading branch information
Erik Hollensbe committed Feb 7, 2017
1 parent 1c202c5 commit c0cb039
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
24 changes: 24 additions & 0 deletions copy/copy.go
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"io/ioutil"
"reflect"
"time"

pb "gopkg.in/cheggaaa/pb.v1"

"github.com/Sirupsen/logrus"
"github.com/containers/image/image"
"github.com/containers/image/manifest"
"github.com/containers/image/signature"
"github.com/containers/image/streamcopy"
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -45,6 +47,8 @@ type imageCopier struct {
diffIDsAreNeeded bool
canModifyManifest bool
reportWriter io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
Expand Down Expand Up @@ -92,6 +96,8 @@ type Options struct {
ReportWriter io.Writer
SourceCtx *types.SystemContext
DestinationCtx *types.SystemContext
ProgressInterval time.Duration // time to wait between reports to signal the progress channel
Progress chan types.ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset.
}

// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
Expand Down Expand Up @@ -162,6 +168,10 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return err
}

if options == nil {
options = &Options{}
}

// If src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates) will be true, it needs to be true by the time we get here.
ic := imageCopier{
copiedBlobs: make(map[digest.Digest]digest.Digest),
Expand All @@ -173,6 +183,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
progressInterval: options.ProgressInterval,
progress: options.Progress,
}

if err := ic.copyLayers(); err != nil {
Expand Down Expand Up @@ -489,6 +501,18 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
inputInfo.Size = -1
}

// the pipe is here to facilitate the stream copier which will act as an
// intermediary to signal progress information to any registered channel.
if ic.progress != nil && ic.progressInterval > 0 {
// make the readcloser first, then move forward with the destStream reader
// to satisfy interfaces from other parts of this function. the readcloser
// will be used in the defer later to settle the pipe.
destReadCloser, writer := io.Pipe()
go streamcopy.WithProgress(writer, destStream, srcInfo, ic.progressInterval, ic.progress)
destStream = destReadCloser
defer destReadCloser.Close()
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
if err != nil {
Expand Down
102 changes: 102 additions & 0 deletions copy/progress_test.go
@@ -0,0 +1,102 @@
package copy

import (
"fmt"
"testing"
"time"

"github.com/containers/image/docker/daemon"
"github.com/containers/image/signature"
"github.com/containers/image/types"
"github.com/docker/docker/reference"
)

// THIS TEST SHOULD BE SLOW. Read inner comments for details.
func TestCopyProgress(t *testing.T) {
interval := 100 * time.Millisecond

ref, err := daemon.ParseReference("docker.io/library/golang:latest")
if err != nil {
t.Fatal(err)
}

img, err := ref.NewImage(nil)
if err != nil {
t.Fatal(err)
}
defer img.Close()

tgtRef, err := reference.ParseNamed("docker.io/containers/test:latest")
if err != nil {
t.Fatal(err)
}
tgt, err := daemon.NewReference("", tgtRef)
if err != nil {
t.Fatal(err)
}

if err != nil {
t.Fatal(err)
}

pc, err := signature.NewPolicyContext(&signature.Policy{
Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()},
})

if err != nil {
t.Fatal(err)
}

signal := make(chan types.ProgressProperties)
errChan := make(chan error, 1)

// This validates that enough events arrived to satisfy the interval time. As
// a result it may require a little bit of work to get right before it will
// pass for all people all the time. Sorry in advance!
go func() {
var (
// if this is false, the time will be populated and this will be set to true.
// This is to ensure we're measuring the progress itself and not other
// operations.
timed bool
tm time.Time
count int
)

for range signal {
if !timed {
tm = time.Now().Add(interval * 2) // account for elapsed time already before the chan will tick at all
timed = true
}
count++
}

delta := (time.Since(tm) - time.Duration(count)*interval)
if delta > interval {
errChan <- fmt.Errorf("counts did not equal during progress signaling: (delta: %d) (since: %d) (elapsed: %d)", delta, time.Since(tm), time.Duration(count)*interval)
}

errChan <- nil
}()

err = Image(
pc,
tgt,
ref,
&Options{
ProgressInterval: interval,
Progress: signal,
RemoveSignatures: true,
})

close(signal)

if err != nil {
t.Fatal(err)
}

if err := <-errChan; err != nil {
t.Fatal(err)
}

}
59 changes: 59 additions & 0 deletions streamcopy/copy.go
@@ -0,0 +1,59 @@
package streamcopy

import (
"bufio"
"io"
"time"

"github.com/containers/image/types"
)

const readerSize = 32768

// WithProgress implements io.Copy with a buffered reader, then measures data
// flow and on the interval -- if still running -- reports offset and artifact
// information to the signal via the channel.
func WithProgress(writer *io.PipeWriter, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.ProgressProperties) {
defer writer.Close()
rd := bufio.NewReaderSize(reader, readerSize)

count := uint64(0)
buf := make([]byte, readerSize)

t := time.Now()

for {
rn, err := rd.Read(buf)
count += uint64(rn)

if err == io.EOF {
if rn > 0 {
goto write
} else {
return
}
}
if err != nil {
writer.CloseWithError(err)
return
}

if interval > 0 && signal != nil {
if time.Since(t) > interval {
signal <- types.ProgressProperties{Offset: count, Artifact: artifact}
t = time.Now()
}
}

write:
_, werr := writer.Write(buf[:rn])
if werr != nil {
writer.CloseWithError(werr)
return
}

if err == io.EOF {
return
}
}
}
7 changes: 7 additions & 0 deletions types/types.go
Expand Up @@ -295,6 +295,13 @@ type SystemContext struct {
DockerDisableV1Ping bool
}

// ProgressProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type ProgressProperties struct {
Artifact BlobInfo
Offset uint64
}

var (
// ErrBlobNotFound can be returned by an ImageDestination's HasBlob() method
ErrBlobNotFound = errors.New("no such blob present")
Expand Down

0 comments on commit c0cb039

Please sign in to comment.