Skip to content

Commit

Permalink
ctr-remote: ensure cancel cleanly when recieves signals during conver…
Browse files Browse the repository at this point in the history
…sion

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Apr 8, 2022
1 parent 7c59b16 commit b484d42
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 5 deletions.
12 changes: 12 additions & 0 deletions cmd/ctr-remote/commands/convert.go
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"os"
"os/signal"

"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/images/converter"
Expand Down Expand Up @@ -174,6 +175,17 @@ When '--all-platforms' is given all images in a manifest list must be available.
}
defer cancel()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go func() {
// Cleanly cancel conversion
select {
case s := <-sigCh:
logrus.Infof("Got %v", s)
cancel()
case <-ctx.Done():
}
}()
newImg, err := converter.Convert(ctx, client, targetRef, srcRef, convertOpts...)
if err != nil {
return err
Expand Down
13 changes: 13 additions & 0 deletions cmd/ctr-remote/commands/optimize.go
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"os/signal"
"time"

"github.com/containerd/containerd"
Expand Down Expand Up @@ -157,6 +158,18 @@ var OptimizeCommand = cli.Command{
f = wrapper(f)
}
layerConvertFunc := logWrapper(f)

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go func() {
// Cleanly cancel conversion
select {
case s := <-sigCh:
logrus.Infof("Got %v", s)
cancel()
case <-ctx.Done():
}
}()
convertOpts = append(convertOpts, converter.WithLayerConvertFunc(layerConvertFunc))
newImg, err := converter.Convert(ctx, client, targetRef, srcRef, convertOpts...)
if err != nil {
Expand Down
41 changes: 38 additions & 3 deletions estargz/build.go
Expand Up @@ -26,6 +26,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
Expand All @@ -47,6 +48,7 @@ type options struct {
prioritizedFiles []string
missedPrioritizedFiles *[]string
compression Compression
ctx context.Context
}

type Option func(o *options) error
Expand Down Expand Up @@ -103,6 +105,14 @@ func WithCompression(compression Compression) Option {
}
}

// WithContext specifies a context that can be used for clean canceleration.
func WithContext(ctx context.Context) Option {
return func(o *options) error {
o.ctx = ctx
return nil
}
}

// Blob is an eStargz blob.
type Blob struct {
io.ReadCloser
Expand Down Expand Up @@ -138,12 +148,29 @@ func Build(tarBlob *io.SectionReader, opt ...Option) (_ *Blob, rErr error) {
opts.compression = newGzipCompressionWithLevel(opts.compressionLevel)
}
layerFiles := newTempFiles()
ctx := opts.ctx
if ctx == nil {
ctx = context.Background()
}
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-done:
// nop
case <-ctx.Done():
layerFiles.CleanupAll()
}
}()
defer func() {
if rErr != nil {
if err := layerFiles.CleanupAll(); err != nil {
rErr = fmt.Errorf("failed to cleanup tmp files: %v: %w", err, rErr)
}
}
if cErr := ctx.Err(); cErr != nil {
rErr = fmt.Errorf("error from context %q: %w", cErr, rErr)
}
}()
tarBlob, err := decompressBlob(tarBlob, layerFiles)
if err != nil {
Expand Down Expand Up @@ -505,8 +532,9 @@ func newTempFiles() *tempFiles {
}

type tempFiles struct {
files []*os.File
filesMu sync.Mutex
files []*os.File
filesMu sync.Mutex
cleanupOnce sync.Once
}

func (tf *tempFiles) TempFile(dir, pattern string) (*os.File, error) {
Expand All @@ -520,7 +548,14 @@ func (tf *tempFiles) TempFile(dir, pattern string) (*os.File, error) {
return f, nil
}

func (tf *tempFiles) CleanupAll() error {
func (tf *tempFiles) CleanupAll() (err error) {
tf.cleanupOnce.Do(func() {
err = tf.cleanupAll()
})
return
}

func (tf *tempFiles) cleanupAll() error {
tf.filesMu.Lock()
defer tf.filesMu.Unlock()
var allErr []error
Expand Down
2 changes: 1 addition & 1 deletion nativeconverter/estargz/estargz.go
Expand Up @@ -77,7 +77,7 @@ func LayerConvertFunc(opts ...estargz.Option) converter.ConvertFunc {
}
defer ra.Close()
sr := io.NewSectionReader(ra, 0, desc.Size)
blob, err := estargz.Build(sr, opts...)
blob, err := estargz.Build(sr, append(opts, estargz.WithContext(ctx))...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion nativeconverter/zstdchunked/zstdchunked.go
Expand Up @@ -114,7 +114,7 @@ func LayerConvertFunc(opts ...estargz.Option) converter.ConvertFunc {
Metadata: metadata,
},
}))
blob, err := estargz.Build(uncompressedSR, opts...)
blob, err := estargz.Build(uncompressedSR, append(opts, estargz.WithContext(ctx))...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b484d42

Please sign in to comment.