Skip to content

Commit

Permalink
Provide callback progress hooks for long copies.
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Hollensbe <github@hollensbe.org>
  • Loading branch information
Erik Hollensbe committed Feb 7, 2017
1 parent 4f14180 commit 21d2390
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 3 deletions.
33 changes: 30 additions & 3 deletions copy/copy.go
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"io/ioutil"
"reflect"
"time"

pb "gopkg.in/cheggaaa/pb.v1"

"github.com/Sirupsen/logrus"
"github.com/containers/image/image"
"github.com/containers/image/manifest"
"github.com/containers/image/signature"
"github.com/containers/image/streamcopy"
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -45,6 +47,8 @@ type imageCopier struct {
diffIDsAreNeeded bool
canModifyManifest bool
reportWriter io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
Expand Down Expand Up @@ -92,12 +96,19 @@ type Options struct {
ReportWriter io.Writer
SourceCtx *types.SystemContext
DestinationCtx *types.SystemContext
ProgressInterval time.Duration // time to wait between reports to signal the progress channel
Progress chan types.ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset.
}

// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageReference, options *Options) error {
reportWriter := ioutil.Discard
if options != nil && options.ReportWriter != nil {

if options == nil {
options = &Options{}
}

if options.ReportWriter != nil {
reportWriter = options.ReportWriter
}
writeReport := func(f string, a ...interface{}) {
Expand Down Expand Up @@ -138,7 +149,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
}

var sigs [][]byte
if options != nil && options.RemoveSignatures {
if options.RemoveSignatures {
sigs = [][]byte{}
} else {
writeReport("Getting image source signatures\n")
Expand Down Expand Up @@ -173,6 +184,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
progressInterval: options.ProgressInterval,
progress: options.Progress,
}

if err := ic.copyLayers(); err != nil {
Expand All @@ -199,7 +212,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return err
}

if options != nil && options.SignBy != "" {
if options.SignBy != "" {
mech, err := signature.NewGPGSigningMechanism()
if err != nil {
return errors.Wrap(err, "Error initializing GPG")
Expand Down Expand Up @@ -489,6 +502,20 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
inputInfo.Size = -1
}

// === Report progress using the ic.progress channel, if required.
if ic.progress != nil && ic.progressInterval > 0 {
// the pipe is here to facilitate the stream copier which will act as an
// intermediary to signal progress information to any registered channel.
//
// make the readcloser first, then move forward with the destStream reader
// to satisfy interfaces from other parts of this function. the readcloser
// will be used in the defer later to settle the pipe.
destReadCloser, writer := io.Pipe()
go streamcopy.WithProgress(writer, destStream, srcInfo, ic.progressInterval, ic.progress)
destStream = destReadCloser
defer destReadCloser.Close()
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
if err != nil {
Expand Down
112 changes: 112 additions & 0 deletions copy/progress_test.go
@@ -0,0 +1,112 @@
package copy

import (
"fmt"
"os/exec"
"testing"
"time"

"github.com/containers/image/docker/daemon"
"github.com/containers/image/signature"
"github.com/containers/image/types"
"github.com/docker/docker/reference"
)

// THIS TEST SHOULD BE SLOW. Read inner comments for details.
func TestCopyProgress(t *testing.T) {
interval := 100 * time.Millisecond

// we shell out to avoid "client is newer than server" errors from docker
// which will happen when importing the client straight from github.
// This of course presumes docker is in the path.
if out, err := exec.Command("/bin/sh", "-c", "docker pull golang:latest").CombinedOutput(); err != nil {
t.Fatalf("%v: %s", err, out)
}

ref, err := daemon.ParseReference("docker.io/library/golang:latest")
if err != nil {
t.Fatal(err)
}

img, err := ref.NewImage(nil)
if err != nil {
t.Fatal(err)
}
defer img.Close()

tgtRef, err := reference.ParseNamed("docker.io/containers/test:latest")
if err != nil {
t.Fatal(err)
}
tgt, err := daemon.NewReference("", tgtRef)
if err != nil {
t.Fatal(err)
}

if err != nil {
t.Fatal(err)
}

pc, err := signature.NewPolicyContext(&signature.Policy{
Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()},
})

if err != nil {
t.Fatal(err)
}

signal := make(chan types.ProgressProperties)
errChan := make(chan error, 1)

// This validates that enough events arrived to satisfy the interval time. As
// a result it may require a little bit of work to get right before it will
// pass for all people all the time. Sorry in advance!
go func() {
var (
// if this is false, the time will be populated and this will be set to true.
// This is to ensure we're measuring the progress itself and not other
// operations.
timed bool
tm time.Time
count int
)

for range signal {
if !timed {
tm = time.Now().Add(interval * 2) // account for elapsed time already before the chan will tick at all
timed = true
}
count++
}

elapsed := time.Duration(count)*interval + interval // last one
since := time.Since(tm)
delta := since - elapsed
if delta > interval {
errChan <- fmt.Errorf("counts did not equal during progress signaling: (delta: %d) (since: %d) (elapsed: %d)", delta, since, elapsed)
}

errChan <- nil
}()

err = Image(
pc,
tgt,
ref,
&Options{
ProgressInterval: interval,
Progress: signal,
RemoveSignatures: true,
})

close(signal)

if err != nil {
t.Fatal(err)
}

if err := <-errChan; err != nil {
t.Fatal(err)
}

}
54 changes: 54 additions & 0 deletions streamcopy/copy.go
@@ -0,0 +1,54 @@
package streamcopy

import (
"io"
"time"

"github.com/containers/image/types"
)

const readerSize = 32768

// WithProgress implements io.Copy with a buffered reader, then measures data
// flow and on the interval -- if still running -- reports offset and artifact
// information to the signal via the channel.
func WithProgress(writer *io.PipeWriter, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.ProgressProperties) {
defer writer.Close()

count := uint64(0)
buf := make([]byte, readerSize)

t := time.Now()

for {
rn, err := reader.Read(buf)
count += uint64(rn)

if rn > 0 && err == io.EOF {
goto write
}

if err != nil {
writer.CloseWithError(err)
return
}

if interval > 0 && signal != nil {
if time.Since(t) > interval {
signal <- types.ProgressProperties{Offset: count, Artifact: artifact}
t = time.Now()
}
}

write:
_, werr := writer.Write(buf[:rn])
if werr != nil {
writer.CloseWithError(werr)
return
}

if err == io.EOF {
return
}
}
}
7 changes: 7 additions & 0 deletions types/types.go
Expand Up @@ -294,6 +294,13 @@ type SystemContext struct {
DockerDisableV1Ping bool
}

// ProgressProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type ProgressProperties struct {
Artifact BlobInfo
Offset uint64
}

var (
// ErrBlobNotFound can be returned by an ImageDestination's HasBlob() method
ErrBlobNotFound = errors.New("no such blob present")
Expand Down

0 comments on commit 21d2390

Please sign in to comment.