Skip to content

Commit

Permalink
Provide callback progress hooks for long copies.
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Hollensbe <github@hollensbe.org>
  • Loading branch information
Erik Hollensbe committed Feb 23, 2017
1 parent 3ff6510 commit ac3f354
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
30 changes: 27 additions & 3 deletions copy/copy.go
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"reflect"
"time"

pb "gopkg.in/cheggaaa/pb.v1"

Expand Down Expand Up @@ -46,6 +47,8 @@ type imageCopier struct {
diffIDsAreNeeded bool
canModifyManifest bool
reportWriter io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
Expand Down Expand Up @@ -93,14 +96,22 @@ type Options struct {
ReportWriter io.Writer
SourceCtx *types.SystemContext
DestinationCtx *types.SystemContext
ProgressInterval time.Duration // time to wait between reports to signal the progress channel
Progress chan types.ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset.
}

// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageReference, options *Options) error {
if options == nil {
options = &Options{}
}

reportWriter := ioutil.Discard
if options != nil && options.ReportWriter != nil {

if options.ReportWriter != nil {
reportWriter = options.ReportWriter
}

writeReport := func(f string, a ...interface{}) {
fmt.Fprintf(reportWriter, f, a...)
}
Expand Down Expand Up @@ -139,7 +150,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
}

var sigs [][]byte
if options != nil && options.RemoveSignatures {
if options.RemoveSignatures {
sigs = [][]byte{}
} else {
writeReport("Getting image source signatures\n")
Expand Down Expand Up @@ -174,6 +185,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
progressInterval: options.ProgressInterval,
progress: options.Progress,
}

if err := ic.copyLayers(); err != nil {
Expand All @@ -200,7 +213,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return err
}

if options != nil && options.SignBy != "" {
if options.SignBy != "" {
mech, err := signature.NewGPGSigningMechanism()
if err != nil {
return errors.Wrap(err, "Error initializing GPG")
Expand Down Expand Up @@ -490,6 +503,17 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
inputInfo.Size = -1
}

// === Report progress using the ic.progress channel, if required.
if ic.progress != nil && ic.progressInterval > 0 {
destStream = &progressReader{
source: destStream,
channel: ic.progress,
interval: ic.progressInterval,
artifact: srcInfo,
lastTime: time.Now(),
}
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions copy/progress_reader.go
@@ -0,0 +1,28 @@
package copy

import (
"io"
"time"

"github.com/containers/image/types"
)

// progressReader is a reader that reports its progress on an interval.
type progressReader struct {
source io.Reader
channel chan types.ProgressProperties
interval time.Duration
artifact types.BlobInfo
lastTime time.Time
offset uint64
}

func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.offset += uint64(n)
if time.Since(r.lastTime) > r.interval {
r.channel <- types.ProgressProperties{Artifact: r.artifact, Offset: r.offset}
r.lastTime = time.Now()
}
return n, err
}
7 changes: 7 additions & 0 deletions types/types.go
Expand Up @@ -294,6 +294,13 @@ type SystemContext struct {
DockerDisableV1Ping bool
}

// ProgressProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type ProgressProperties struct {
Artifact BlobInfo
Offset uint64
}

var (
// ErrBlobNotFound can be returned by an ImageDestination's HasBlob() method
ErrBlobNotFound = errors.New("no such blob present")
Expand Down

0 comments on commit ac3f354

Please sign in to comment.