From 23518fdecb140d8fc9311f9a7e7d8b353acbf60c Mon Sep 17 00:00:00 2001 From: YuQiang Date: Wed, 24 Jan 2024 12:22:52 +0800 Subject: [PATCH] feat: add nydusify commit command add nydusify commit command to commit a nydus container into nydus image Signed-off-by: YuQiang --- contrib/nydusify/cmd/nydusify.go | 89 +++ contrib/nydusify/go.mod | 11 +- contrib/nydusify/go.sum | 2 + contrib/nydusify/pkg/commiter/commiter.go | 671 ++++++++++++++++++++++ contrib/nydusify/pkg/commiter/manager.go | 133 +++++ contrib/nydusify/pkg/commiter/nsenter.go | 183 ++++++ contrib/nydusify/pkg/commiter/util.go | 25 + contrib/nydusify/pkg/utils/constant.go | 3 + 8 files changed, 1112 insertions(+), 5 deletions(-) create mode 100644 contrib/nydusify/pkg/commiter/commiter.go create mode 100644 contrib/nydusify/pkg/commiter/manager.go create mode 100644 contrib/nydusify/pkg/commiter/nsenter.go create mode 100644 contrib/nydusify/pkg/commiter/util.go diff --git a/contrib/nydusify/cmd/nydusify.go b/contrib/nydusify/cmd/nydusify.go index 517a6588529..fb3f32dcd2a 100644 --- a/contrib/nydusify/cmd/nydusify.go +++ b/contrib/nydusify/cmd/nydusify.go @@ -26,6 +26,7 @@ import ( "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/checker" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/checker/rule" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/chunkdict/generator" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/commiter" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/converter" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/copier" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/packer" @@ -1103,6 +1104,94 @@ func main() { return copier.Copy(context.Background(), opt) }, }, + { + Name: "commit", + Usage: "Commit a container into nydus image based a nydus image", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "work-dir", + Value: "./tmp", + Usage: "Working directory for image conversion", + EnvVars: []string{"WORK_DIR"}, + }, + &cli.StringFlag{ + Name: "nydus-image", + Value: "nydus-image", + Usage: "Path to the nydus-image binary, default to search in PATH", + EnvVars: []string{"NYDUS_IMAGE"}, + }, + &cli.StringFlag{ + Name: "containerd-address", + Value: "/run/containerd/containerd.sock", + Usage: "Socket listener address of containerd", + EnvVars: []string{"CONTAINERD_ADDR"}, + }, + &cli.StringFlag{ + Name: "container", + Required: true, + Usage: "Target container id", + EnvVars: []string{"CONTAINER"}, + }, + &cli.StringFlag{ + Name: "target", + Required: true, + Usage: "Target nydus image reference", + EnvVars: []string{"TARGET"}, + }, + &cli.IntFlag{ + Name: "maximum-times", + Required: false, + DefaultText: "400", + Value: 400, + Usage: "The maximum times allowed to be committed", + EnvVars: []string{"MAXIMUM_TIMES"}, + }, + &cli.StringSliceFlag{ + Name: "with-path", + Aliases: []string{"with-mount-path"}, + Required: false, + Usage: "The directory that need to be committed", + EnvVars: []string{"WITH_PATH"}, + }, + }, + Action: func(c *cli.Context) error { + setupLogLevel(c) + parsePaths := func(c *cli.Context, paths []string) ([]string, []string) { + withPaths := []string{} + withoutPaths := []string{} + + for _, path := range paths { + path = strings.TrimSpace(path) + if strings.HasPrefix(path, "!") { + path = strings.TrimLeft(path, "!") + path = strings.TrimRight(path, "/") + withoutPaths = append(withoutPaths, path) + } else { + withPaths = append(withPaths, path) + } + } + + return withPaths, withoutPaths + } + + withPaths, withoutPaths := parsePaths(c, c.StringSlice("with-path")) + opt := commiter.Opt { + WorkDir: c.String("work-dir"), + NydusImagePath: c.String("nydus-image"), + ContainerdAddress: c.String("containerd-address"), + ContainerID: c.String("container"), + TargetRef: c.String("target"), + MaximumTimes: c.Int("maximum-times"), + WithPaths: withPaths, + WithoutPaths: withoutPaths, + } + cm, err := commiter.NewCommiter(opt) + if err != nil { + return errors.Wrap(err, "create commiter") + } + return cm.Commit(context.Background(), opt) + }, + }, } if !utils.IsSupportedArch(runtime.GOARCH) { diff --git a/contrib/nydusify/go.mod b/contrib/nydusify/go.mod index 128670fe23f..e27c4360561 100644 --- a/contrib/nydusify/go.mod +++ b/contrib/nydusify/go.mod @@ -3,6 +3,7 @@ module github.com/dragonflyoss/nydus/contrib/nydusify go 1.21 require ( + github.com/Microsoft/hcsshim v0.11.4 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aws/aws-sdk-go-v2 v1.24.0 github.com/aws/aws-sdk-go-v2/config v1.26.2 @@ -10,6 +11,8 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.9 github.com/aws/aws-sdk-go-v2/service/s3 v1.47.7 github.com/containerd/containerd v1.7.11 + github.com/containerd/continuity v0.4.3 + github.com/containerd/log v0.1.0 github.com/containerd/nydus-snapshotter v0.13.4 github.com/distribution/reference v0.5.0 github.com/docker/cli v24.0.7+incompatible @@ -18,6 +21,9 @@ require ( github.com/google/uuid v1.5.0 github.com/hashicorp/go-hclog v1.6.2 github.com/hashicorp/go-plugin v1.6.0 + github.com/klauspost/compress v1.17.4 + github.com/moby/buildkit v0.12.4 + github.com/moby/sys/sequential v0.5.0 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0-rc5 github.com/pkg/errors v0.9.1 @@ -35,7 +41,6 @@ require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20231105174938-2b5cbb29f3e2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect - github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9 // indirect @@ -53,9 +58,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect - github.com/containerd/continuity v0.4.3 // indirect github.com/containerd/fifo v1.1.0 // indirect - github.com/containerd/log v0.1.0 // indirect github.com/containerd/stargz-snapshotter v0.15.1 // indirect github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect github.com/containerd/ttrpc v1.2.2 // indirect @@ -77,7 +80,6 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -86,7 +88,6 @@ require ( github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/moby/locker v1.0.1 // indirect github.com/moby/sys/mountinfo v0.7.1 // indirect - github.com/moby/sys/sequential v0.5.0 // indirect github.com/moby/sys/signal v0.7.0 // indirect github.com/oklog/run v1.1.0 // indirect github.com/opencontainers/runc v1.1.11 // indirect diff --git a/contrib/nydusify/go.sum b/contrib/nydusify/go.sum index bc9bd2eb34d..388ac8c5461 100644 --- a/contrib/nydusify/go.sum +++ b/contrib/nydusify/go.sum @@ -186,6 +186,8 @@ github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= +github.com/moby/buildkit v0.12.4 h1:yKZDsObXLKarXqUx7YMnaB+TKv810bBhq0XLFWbkjT0= +github.com/moby/buildkit v0.12.4/go.mod h1:XG74uz06nPWQpnxYwgCryrVidvor0+ElUxGosbZPQG4= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/sys/mountinfo v0.7.1 h1:/tTvQaSJRr2FshkhXiIpux6fQ2Zvc4j7tAhMTStAG2g= diff --git a/contrib/nydusify/pkg/commiter/commiter.go b/contrib/nydusify/pkg/commiter/commiter.go new file mode 100644 index 00000000000..763cc7eb68e --- /dev/null +++ b/contrib/nydusify/pkg/commiter/commiter.go @@ -0,0 +1,671 @@ +package commiter + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/reference/docker" + "github.com/containerd/nydus-snapshotter/pkg/converter" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/commiter/diff" + parserPkg "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/parser" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/provider" + "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/utils" + "github.com/dustin/go-humanize" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +var NydusRefSuffix = "_nydus_v2" + +type Opt struct { + WorkDir string + ContainerdAddress string + NydusImagePath string + + ContainerID string + TargetRef string + RemoteInsecure bool + MaximumTimes int + + WithPaths []string + WithoutPaths []string +} + +type Commiter struct { + workDir string + builder string + manager *Manager +} + +func NewCommiter(opt Opt) (*Commiter, error) { + if err := os.MkdirAll(opt.WorkDir, 0755); err != nil { + return nil, errors.Wrap(err, "prepare work dir") + } + + workDir, err := os.MkdirTemp(opt.WorkDir, "nydusify-commiter-") + if err != nil { + return nil, errors.Wrap(err, "create temp dir") + } + + cm, err := NewManager(opt.ContainerdAddress) + if err != nil { + return nil, errors.Wrap(err, "new container manager") + } + return &Commiter{ + workDir: workDir, + builder: opt.NydusImagePath, + manager: cm, + }, nil +} + +func (cm *Commiter) Commit(ctx context.Context, opt Opt) error { + targetRef, err := AppendNydusSuffix(opt.TargetRef) + if err != nil { + return errors.Wrap(err, "parse target image name") + } + + inspect, err := cm.manager.Inspect(ctx, opt.ContainerID) + if err != nil { + return errors.Wrap(err, "inspect container") + } + + logrus.Infof("pulling base bootstrap") + start := time.Now() + image, committedLayers, err := cm.pullBootstrap(ctx, inspect.Image, "bootstrap-base", opt.RemoteInsecure) + if err != nil { + return errors.Wrap(err, "pull base bootstrap") + } + logrus.Infof("pulled base bootstrap, elapsed: %s", time.Since(start)) + + if committedLayers >= opt.MaximumTimes { + return fmt.Errorf("reached maximum committed times %d", opt.MaximumTimes) + } + + mountList := NewMountList() + + var upperBlob *Blob + mountBlobs := make([]Blob, len(opt.WithPaths)) + commit := func() error { + eg := errgroup.Group{} + eg.Go(func() error { + var upperBlobDigest *digest.Digest + if err := withRetry(func() error { + upperBlobDigest, err = cm.commitUpperByDiff(ctx, mountList.Add, opt.WithPaths, opt.WithoutPaths, inspect.LowerDirs, inspect.UpperDir, "blob-upper") + return err + }, 3); err != nil { + return errors.Wrap(err, "commit upper") + } + logrus.Infof("pushing blob for upper") + start := time.Now() + upperBlobDesc, err := cm.pushBlob(ctx, "blob-upper", *upperBlobDigest, opt.TargetRef, opt.RemoteInsecure) + if err != nil { + return errors.Wrap(err, "push upper blob") + } + upperBlob = &Blob{ + Name: "blob-upper", + Desc: *upperBlobDesc, + } + logrus.Infof("pushed blob for upper, elapsed: %s", time.Since(start)) + return nil + }) + + if len(opt.WithPaths) > 0 { + for idx := range opt.WithPaths { + func(idx int) { + eg.Go(func() error { + withPath := opt.WithPaths[idx] + name := fmt.Sprintf("blob-mount-%d", idx) + var mountBlobDigest *digest.Digest + if err := withRetry(func() error { + mountBlobDigest, err = cm.commitMountByNSEnter(ctx, inspect.Pid, withPath, name) + return err + }, 3); err != nil { + return errors.Wrap(err, "commit mount") + } + logrus.Infof("pushing blob for mount") + start := time.Now() + mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, opt.TargetRef, opt.RemoteInsecure) + if err != nil { + return errors.Wrap(err, "push mount blob") + } + mountBlobs[idx] = Blob{ + Name: name, + Desc: *mountBlobDesc, + } + logrus.Infof("pushed blob for mount, elapsed: %s", time.Since(start)) + return nil + }) + }(idx) + } + } + + if err := eg.Wait(); err != nil { + return err + } + + appendedEg := errgroup.Group{} + appendedMutex := sync.Mutex{} + if len(mountList.paths) > 0 { + logrus.Infof("need commit appened mount path: %s", strings.Join(mountList.paths, ", ")) + } + for idx := range mountList.paths { + func(idx int) { + appendedEg.Go(func() error { + mountPath := mountList.paths[idx] + name := fmt.Sprintf("blob-appended-mount-%d", idx) + var mountBlobDigest *digest.Digest + if err := withRetry(func() error { + mountBlobDigest, err = cm.commitMountByNSEnter(ctx, inspect.Pid, mountPath, name) + return err + }, 3); err != nil { + return errors.Wrap(err, "commit appended mount") + } + logrus.Infof("pushing blob for appended mount") + start := time.Now() + mountBlobDesc, err := cm.pushBlob(ctx, name, *mountBlobDigest, opt.TargetRef, opt.RemoteInsecure) + if err != nil { + return errors.Wrap(err, "push appended mount blob") + } + appendedMutex.Lock() + mountBlobs = append(mountBlobs, Blob{ + Name: name, + Desc: *mountBlobDesc, + }) + appendedMutex.Unlock() + logrus.Infof("pushed blob for appended mount, elapsed: %s", time.Since(start)) + return nil + }) + }(idx) + } + + return appendedEg.Wait() + } + + if err := cm.pause(ctx, opt.ContainerID, commit); err != nil { + return errors.Wrap(err, "pause container to commit") + } + + logrus.Infof("merging base and upper bootstraps") + blobDigests, bootstrapDiffID, err := cm.mergeBootstrap(ctx, *upperBlob, mountBlobs, "bootstrap-base", "bootstrap-merged.tar") + if err != nil { + return errors.Wrap(err, "merge bootstrap") + } + + logrus.Infof("pushing committed image to %s", targetRef) + if err := cm.pushManifest(ctx, *image, *bootstrapDiffID, targetRef, "bootstrap-merged.tar", blobDigests, upperBlob, mountBlobs, opt.RemoteInsecure); err != nil { + return errors.Wrap(err, "push manifest") + } + + return nil +} + +func (cm *Commiter) pullBootstrap(ctx context.Context, ref, bootstrapName string, insecure bool) (*parserPkg.Image, int, error) { + remoter, err := provider.DefaultRemote(ref, insecure) + if err != nil { + return nil, 0, errors.Wrap(err, "create remote") + } + + parser, err := parserPkg.New(remoter, "amd64") + if err != nil { + return nil, 0, errors.Wrap(err, "create parser") + } + + var parsed *parserPkg.Parsed + parsed, err = parser.Parse(ctx) + if err != nil { + if utils.RetryWithHTTP(err) { + remoter.MaybeWithHTTP(err) + parsed, err = parser.Parse(ctx) + if err != nil { + return nil, 0, errors.Wrap(err, "parse nydus image") + } + } else { + return nil, 0, errors.Wrap(err, "parse nydus image") + } + } + if parsed.NydusImage == nil { + return nil, 0, fmt.Errorf("not a nydus image: %s", ref) + } + + bootstrapDesc := parserPkg.FindNydusBootstrapDesc(&parsed.NydusImage.Manifest) + if bootstrapDesc == nil { + return nil, 0, fmt.Errorf("not found nydus bootstrap layer") + } + committedLayers := 0 + _commitBlobs := bootstrapDesc.Annotations[utils.LayerAnnotationNydusCommitBlobs] + if _commitBlobs != "" { + committedLayers = len(strings.Split(_commitBlobs, ",")) + logrus.Infof("detected the committed layers: %d", committedLayers) + } + + target := filepath.Join(cm.workDir, bootstrapName) + reader, err := parser.PullNydusBootstrap(ctx, parsed.NydusImage) + if err != nil { + return nil, 0, errors.Wrap(err, "pull bootstrap layer") + } + defer reader.Close() + + if err := utils.UnpackFile(reader, utils.BootstrapFileNameInLayer, target); err != nil { + return nil, 0, errors.Wrap(err, "unpack bootstrap layer") + } + + return parsed.NydusImage, committedLayers, nil +} + +func (cm *Commiter) commitUpperByDiff(ctx context.Context, appendMount func(path string), withPaths []string, withoutPaths []string, lowerDirs, upperDir, blobName string) (*digest.Digest, error) { + logrus.Infof("committing upper") + start := time.Now() + + blobPath := filepath.Join(cm.workDir, blobName) + blob, err := os.Create(blobPath) + if err != nil { + return nil, errors.Wrap(err, "create upper blob file") + } + defer blob.Close() + + digester := digest.SHA256.Digester() + counter := Counter{} + tarWc, err := converter.Pack(ctx, io.MultiWriter(blob, digester.Hash(), &counter), converter.PackOption{ + WorkDir: cm.workDir, + FsVersion: "6", + Compressor: "lz4_block", + BuilderPath: cm.builder, + }) + if err != nil { + return nil, errors.Wrap(err, "initialize pack to blob") + } + + if err := diff.Diff(ctx, appendMount, withPaths, withoutPaths, tarWc, lowerDirs, upperDir); err != nil { + return nil, errors.Wrap(err, "make diff") + } + + if err := tarWc.Close(); err != nil { + return nil, errors.Wrap(err, "pack to blob") + } + + blobDigest := digester.Digest() + logrus.Infof("committed upper, size: %s, elapsed: %s", humanize.Bytes(uint64(counter.Size())), time.Since(start)) + + return &blobDigest, nil +} + +func (cm *Commiter) pushBlob(ctx context.Context, blobName string, blobDigest digest.Digest, targetRef string, insecure bool) (*ocispec.Descriptor, error) { + blobRa, err := local.OpenReader(filepath.Join(cm.workDir, blobName)) + if err != nil { + return nil, errors.Wrap(err, "open reader for upper blob") + } + + blobDesc := ocispec.Descriptor{ + Digest: blobDigest, + Size: blobRa.Size(), + MediaType: utils.MediaTypeNydusBlob, + Annotations: map[string]string{ + utils.LayerAnnotationUncompressed: blobDigest.String(), + utils.LayerAnnotationNydusBlob: "true", + }, + } + + remoter, err := provider.DefaultRemote(targetRef, insecure) + if err != nil { + return nil, errors.Wrap(err, "create remote") + } + + if err := remoter.Push(ctx, blobDesc, true, io.NewSectionReader(blobRa, 0, blobRa.Size())); err != nil { + if utils.RetryWithHTTP(err) { + remoter.MaybeWithHTTP(err) + if err := remoter.Push(ctx, blobDesc, true, io.NewSectionReader(blobRa, 0, blobRa.Size())); err != nil { + return nil, errors.Wrap(err, "push blob") + } + } else { + return nil, errors.Wrap(err, "push blob") + } + } + return &blobDesc, nil +} + +func (cm *Commiter) pause(ctx context.Context, containerID string, handle func() error) error { + logrus.Infof("pausing container: %s", containerID) + if err := cm.manager.Pause(ctx, containerID); err != nil { + return errors.Wrap(err, "pause container") + } + + if err := handle(); err != nil { + logrus.Infof("unpausing container: %s", containerID) + if err := cm.manager.UnPause(ctx, containerID); err != nil { + logrus.Errorf("unpause container: %s", containerID) + } + return err + } + + logrus.Infof("unpausing container: %s", containerID) + return cm.manager.UnPause(ctx, containerID) +} + +func (cm *Commiter) pushManifest( + ctx context.Context, nydusImage parserPkg.Image, bootstrapDiffID digest.Digest, targetRef, bootstrapName string, blobDigests []digest.Digest, upperBlob *Blob, mountBlobs []Blob, insecure bool, +) error { + lowerBlobLayers := []ocispec.Descriptor{} + for idx := range nydusImage.Manifest.Layers { + layer := nydusImage.Manifest.Layers[idx] + if layer.MediaType == utils.MediaTypeNydusBlob { + lowerBlobLayers = append(lowerBlobLayers, layer) + } + } + + // Push image config + config := nydusImage.Config + + config.RootFS.DiffIDs = []digest.Digest{} + for idx := range lowerBlobLayers { + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, lowerBlobLayers[idx].Digest) + } + for idx := range mountBlobs { + mountBlob := mountBlobs[idx] + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, mountBlob.Desc.Digest) + } + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, upperBlob.Desc.Digest) + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, bootstrapDiffID) + + configBytes, configDesc, err := cm.makeDesc(ctx, config, nydusImage.Manifest.Config) + if err != nil { + return errors.Wrap(err, "make config desc") + } + + remoter, err := provider.DefaultRemote(targetRef, insecure) + if err != nil { + return errors.Wrap(err, "create remote") + } + + if err := remoter.Push(ctx, *configDesc, true, bytes.NewReader(configBytes)); err != nil { + if utils.RetryWithHTTP(err) { + remoter.MaybeWithHTTP(err) + if err := remoter.Push(ctx, *configDesc, true, bytes.NewReader(configBytes)); err != nil { + return errors.Wrap(err, "push image config") + } + } else { + return errors.Wrap(err, "push image config") + } + } + + // Push bootstrap layer + bootstrapTarPath := filepath.Join(cm.workDir, bootstrapName) + bootstrapTar, err := os.Open(bootstrapTarPath) + if err != nil { + return errors.Wrap(err, "open bootstrap tar file") + } + + bootstrapTarGzPath := filepath.Join(cm.workDir, bootstrapName+".gz") + bootstrapTarGz, err := os.Create(bootstrapTarGzPath) + if err != nil { + return errors.Wrap(err, "create bootstrap tar.gz file") + } + defer bootstrapTarGz.Close() + + digester := digest.SHA256.Digester() + gzWriter := gzip.NewWriter(io.MultiWriter(bootstrapTarGz, digester.Hash())) + if _, err := io.Copy(gzWriter, bootstrapTar); err != nil { + return errors.Wrap(err, "compress bootstrap tar to tar.gz") + } + if err := gzWriter.Close(); err != nil { + return errors.Wrap(err, "close gzip writer") + } + + ra, err := local.OpenReader(bootstrapTarGzPath) + if err != nil { + return errors.Wrap(err, "open reader for upper blob") + } + defer ra.Close() + + commitBlobs := []string{} + for idx := range mountBlobs { + mountBlob := mountBlobs[idx] + commitBlobs = append(commitBlobs, mountBlob.Desc.Digest.String()) + } + commitBlobs = append(commitBlobs, upperBlob.Desc.Digest.String()) + + bootstrapDesc := ocispec.Descriptor{ + Digest: digester.Digest(), + Size: ra.Size(), + MediaType: ocispec.MediaTypeImageLayerGzip, + Annotations: map[string]string{ + converter.LayerAnnotationFSVersion: "6", + converter.LayerAnnotationNydusBootstrap: "true", + utils.LayerAnnotationNydusCommitBlobs: strings.Join(commitBlobs, ","), + }, + } + + bootstrapRc, err := os.Open(bootstrapTarGzPath) + if err != nil { + return errors.Wrapf(err, "open bootstrap %s", bootstrapTarGzPath) + } + defer bootstrapRc.Close() + if err := remoter.Push(ctx, bootstrapDesc, true, bootstrapRc); err != nil { + return errors.Wrap(err, "push bootstrap layer") + } + + // Push image manifest + layers := lowerBlobLayers + for idx := range mountBlobs { + mountBlob := mountBlobs[idx] + layers = append(layers, mountBlob.Desc) + } + layers = append(layers, upperBlob.Desc) + layers = append(layers, bootstrapDesc) + + nydusImage.Manifest.Config = *configDesc + nydusImage.Manifest.Layers = layers + + manifestBytes, manifestDesc, err := cm.makeDesc(ctx, nydusImage.Manifest, nydusImage.Desc) + if err != nil { + return errors.Wrap(err, "make config desc") + } + if err := remoter.Push(ctx, *manifestDesc, false, bytes.NewReader(manifestBytes)); err != nil { + return errors.Wrap(err, "push image manifest") + } + + return nil +} + +func (cm *Commiter) makeDesc(ctx context.Context, x interface{}, oldDesc ocispec.Descriptor) ([]byte, *ocispec.Descriptor, error) { + data, err := json.MarshalIndent(x, "", " ") + if err != nil { + return nil, nil, errors.Wrap(err, "json marshal") + } + dgst := digest.SHA256.FromBytes(data) + + newDesc := oldDesc + newDesc.Size = int64(len(data)) + newDesc.Digest = dgst + + return data, &newDesc, nil +} + +func (cm *Commiter) commitMountByNSEnter(ctx context.Context, containerPid int, sourceDir, name string) (*digest.Digest, error) { + logrus.Infof("committing mount: %s", sourceDir) + start := time.Now() + + blobPath := filepath.Join(cm.workDir, name) + blob, err := os.Create(blobPath) + if err != nil { + return nil, errors.Wrap(err, "create mount blob file") + } + defer blob.Close() + + digester := digest.SHA256.Digester() + counter := Counter{} + tarWc, err := converter.Pack(ctx, io.MultiWriter(blob, &counter, digester.Hash()), converter.PackOption{ + WorkDir: cm.workDir, + FsVersion: "6", + Compressor: "lz4_block", + BuilderPath: cm.builder, + }) + if err != nil { + return nil, errors.Wrap(err, "initialize pack to blob") + } + + if err := copyFromContainer(ctx, containerPid, sourceDir, tarWc); err != nil { + return nil, errors.Wrapf(err, "copy %s from pid %d", sourceDir, containerPid) + } + + if err := tarWc.Close(); err != nil { + return nil, errors.Wrap(err, "pack to blob") + } + + mountBlobDigest := digester.Digest() + + logrus.Infof("committed mount: %s, size: %s, elapsed %s", sourceDir, humanize.Bytes(uint64(counter.Size())), time.Since(start)) + + return &mountBlobDigest, nil +} + +func (cm *Commiter) mergeBootstrap( + ctx context.Context, upperBlob Blob, mountBlobs []Blob, baseBootstrapName, mergedBootstrapName string, +) ([]digest.Digest, *digest.Digest, error) { + baseBootstrap := filepath.Join(cm.workDir, baseBootstrapName) + upperBlobRa, err := local.OpenReader(filepath.Join(cm.workDir, upperBlob.Name)) + if err != nil { + return nil, nil, errors.Wrap(err, "open reader for upper blob") + } + + mergedBootstrap := filepath.Join(cm.workDir, mergedBootstrapName) + bootstrap, err := os.Create(mergedBootstrap) + if err != nil { + return nil, nil, errors.Wrap(err, "create upper blob file") + } + defer bootstrap.Close() + + digester := digest.SHA256.Digester() + writer := io.MultiWriter(bootstrap, digester.Hash()) + + layers := []converter.Layer{} + layers = append(layers, converter.Layer{ + Digest: upperBlob.Desc.Digest, + ReaderAt: upperBlobRa, + }) + for idx := range mountBlobs { + mountBlob := mountBlobs[idx] + mountBlobRa, err := local.OpenReader(filepath.Join(cm.workDir, mountBlob.Name)) + if err != nil { + return nil, nil, errors.Wrap(err, "open reader for mount blob") + } + layers = append(layers, converter.Layer{ + Digest: mountBlob.Desc.Digest, + ReaderAt: mountBlobRa, + }) + } + + blobDigests, err := converter.Merge(ctx, layers, writer, converter.MergeOption{ + WorkDir: cm.workDir, + FsVersion: "6", + ParentBootstrapPath: baseBootstrap, + WithTar: true, + BuilderPath: cm.builder, + }) + if err != nil { + return nil, nil, errors.Wrap(err, "merge bootstraps") + } + bootstrapDiffID := digester.Digest() + + return blobDigests, &bootstrapDiffID, nil +} + +func copyFromContainer(ctx context.Context, containerPid int, source string, target io.Writer) error { + config := &Config{ + Mount: true, + Target: containerPid, + } + + stderr, err := config.ExecuteContext(ctx, target, "tar", "--xattrs", "--ignore-failed-read", "--absolute-names", "-cf", "-", source) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("execute tar: %s", strings.TrimSpace(stderr))) + } + if stderr != "" { + logrus.Warnf("from container: %s", stderr) + } + + return nil +} + +type MountList struct { + mutex sync.Mutex + paths []string +} + +func NewMountList() *MountList { + return &MountList{ + paths: make([]string, 0), + } +} + +func (ml *MountList) Add(path string) { + ml.mutex.Lock() + defer ml.mutex.Unlock() + + ml.paths = append(ml.paths, path) +} + +type Blob struct { + Name string + BootstrapName string + Desc ocispec.Descriptor +} + +func withRetry(handle func() error, total int) error { + for { + total-- + err := handle() + if err == nil { + return nil + } + + if total > 0 { + logrus.WithError(err).Warnf("retry (remain %d times)", total) + continue + } + + return err + } +} + +// AppendNydusSuffix appends nydus suffix to the image `ref` and return nydus image. +func AppendNydusSuffix(ref string) (string, error) { + named, err := docker.ParseDockerRef(ref) + if err != nil { + return "", errors.Wrapf(err, "invalid image reference: %s", ref) + } + if _, ok := named.(docker.Digested); ok { + return "", fmt.Errorf("unsupported digested image reference: %s", ref) + } + named = docker.TagNameOnly(named) + if strings.HasSuffix(named.String(), NydusRefSuffix) { + return ref, nil + } + target := named.String() + NydusRefSuffix + return target, nil +} + +// HasNydusSuffix checks weather if the image `ref` has `_nydus_v2` suffix. +func HasNydusSuffix(ref string) (bool, error) { + named, err := docker.ParseDockerRef(ref) + if err != nil { + return false, errors.Wrapf(err, "invalid image reference: %s", ref) + } + if _, ok := named.(docker.Digested); ok { + return false, fmt.Errorf("unsupported digested image reference: %s", ref) + } + named = docker.TagNameOnly(named) + return strings.HasSuffix(named.String(), NydusRefSuffix), nil +} diff --git a/contrib/nydusify/pkg/commiter/manager.go b/contrib/nydusify/pkg/commiter/manager.go new file mode 100644 index 00000000000..97832f0910c --- /dev/null +++ b/contrib/nydusify/pkg/commiter/manager.go @@ -0,0 +1,133 @@ +package commiter + +import ( + "context" + "encoding/json" + "strings" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/oci" + + "github.com/pkg/errors" +) + +type InspectResult struct { + LowerDirs string + UpperDir string + Image string + Mounts []Mount + Pid int +} + +type Mount struct { + Destination string + Source string +} + +type Manager struct { + address string +} + +func NewManager(addr string) (*Manager, error) { + return &Manager{ + address: addr, + }, nil +} + +func (m *Manager) Pause(ctx context.Context, containerID string) error { + ctx = namespaces.WithNamespace(ctx, "default") + client, err := containerd.New(m.address) + if err != nil { + return errors.Wrapf(err, "create client") + } + container, err := client.LoadContainer(ctx, containerID) + if err != nil { + return errors.Wrapf(err, "load container") + } + task, err := container.Task(ctx, nil) + if err != nil { + return errors.Wrapf(err, "obtain container task") + } + + return task.Pause(ctx) +} + +func (m *Manager) UnPause(ctx context.Context, containerID string) error { + ctx = namespaces.WithNamespace(ctx, "default") + client, err := containerd.New(m.address) + if err != nil { + return errors.Wrapf(err, "create client") + } + container, err := client.LoadContainer(ctx, containerID) + if err != nil { + return errors.Wrapf(err, "load container") + } + task, err := container.Task(ctx, nil) + if err != nil { + return errors.Wrapf(err, "obtain container task") + } + + return task.Resume(ctx) +} + +func (m *Manager) Inspect(ctx context.Context, containerID string) (*InspectResult, error) { + ctx = namespaces.WithNamespace(ctx, "default") + client, err := containerd.New(m.address) + if err != nil { + return nil, errors.Wrapf(err, "create client") + } + container, err := client.LoadContainer(ctx, containerID) + if err != nil { + return nil, errors.Wrapf(err, "load container") + } + _image, err := container.Image(ctx) + if err != nil { + return nil, errors.Wrapf(err, "obtain container image") + } + image := _image.Name() + + task, err := container.Task(ctx, nil) + if err != nil { + return nil, errors.Wrapf(err, "obtain container task") + } + pid := int(task.Pid()) + + containerInfo, err := container.Info(ctx, containerd.WithoutRefreshedMetadata) + if err != nil { + return nil, errors.Wrapf(err, "obtain container info") + } + spec := oci.Spec{} + if err := json.Unmarshal(containerInfo.Spec.GetValue(), &spec); err != nil { + return nil, errors.Wrapf(err, "unmarshal json") + } + mounts := []Mount{} + for _, mount := range spec.Mounts { + mounts = append(mounts, Mount{ + Destination: mount.Destination, + Source: mount.Source, + }) + } + + snapshot := client.SnapshotService("nydus") + if err != nil { + return nil, errors.Wrapf(err, "obtain container info") + } + lowerDirs := "" + upperDir := "" + if mount, err := snapshot.Mounts(ctx, containerInfo.SnapshotKey); err != nil { + return nil, errors.Wrapf(err, "get snapshot mount") + } else { + // snapshot Mount Options[0] "workdir=$workdir", Options[1] "upperdir=$upperdir", Options[2] "lowerdir=$lowerdir". + lowerDirs = strings.TrimPrefix(mount[0].Options[2], "lowerdir=") + upperDir = strings.TrimPrefix(mount[0].Options[1], "upperdir=") + } + + return &InspectResult{ + LowerDirs: lowerDirs, + UpperDir: upperDir, + Image: image, + Mounts: mounts, + Pid: pid, + }, nil +} diff --git a/contrib/nydusify/pkg/commiter/nsenter.go b/contrib/nydusify/pkg/commiter/nsenter.go new file mode 100644 index 00000000000..c85b3d38090 --- /dev/null +++ b/contrib/nydusify/pkg/commiter/nsenter.go @@ -0,0 +1,183 @@ +package commiter + +import ( + "bytes" + "context" + "fmt" + "io" + "os/exec" + "strconv" + "time" +) + +// Config is the nsenter configuration used to generate +// nsenter command +type Config struct { + Cgroup bool // Enter cgroup namespace + CgroupFile string // Cgroup namespace location, default to /proc/PID/ns/cgroup + FollowContext bool // Set SELinux security context + GID int // GID to use to execute given program + IPC bool // Enter IPC namespace + IPCFile string // IPC namespace location, default to /proc/PID/ns/ipc + Mount bool // Enter mount namespace + MountFile string // Mount namespace location, default to /proc/PID/ns/mnt + Net bool // Enter network namespace + NetFile string // Network namespace location, default to /proc/PID/ns/net + NoFork bool // Do not fork before executing the specified program + PID bool // Enter PID namespace + PIDFile string // PID namespace location, default to /proc/PID/ns/pid + PreserveCredentials bool // Preserve current UID/GID when entering namespaces + RootDirectory string // Set the root directory, default to target process root directory + Target int // Target PID (required) + UID int // UID to use to execute given program + User bool // Enter user namespace + UserFile string // User namespace location, default to /proc/PID/ns/user + UTS bool // Enter UTS namespace + UTSFile string // UTS namespace location, default to /proc/PID/ns/uts + WorkingDirectory string // Set the working directory, default to target process working directory +} + +// Execute executs the givne command with a default background context +func (c *Config) Execute(writer io.Writer, program string, args ...string) (string, error) { + return c.ExecuteContext(context.Background(), writer, program, args...) +} + +// ExecuteContext the given program using the given nsenter configuration and given context +// and return stdout/stderr or an error if command has failed +func (c *Config) ExecuteContext(ctx context.Context, writer io.Writer, program string, args ...string) (string, error) { + cmd, err := c.buildCommand(ctx) + if err != nil { + return "", fmt.Errorf("Error while building command: %v", err) + } + + // Prepare command + var srderr bytes.Buffer + rc, err := cmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("Open stdout pipe: %v", err) + } + defer rc.Close() + + cmd.Stderr = &srderr + cmd.Args = append(cmd.Args, program) + cmd.Args = append(cmd.Args, args...) + + if err := cmd.Start(); err != nil { + return srderr.String(), err + } + + // HACK: we can't wait rc.Close happen automatically when process + // exits, so must check process state and call rc.Close() by manually. + go func() { + for { + time.Sleep(time.Second * 1) + if cmd.ProcessState != nil && cmd.ProcessState.Exited() { + rc.Close() + break + } + } + }() + + if _, err := io.Copy(writer, rc); err != nil { + return srderr.String(), err + } + + return srderr.String(), cmd.Wait() +} + +func (c *Config) buildCommand(ctx context.Context) (*exec.Cmd, error) { + if c.Target == 0 { + return nil, fmt.Errorf("Target must be specified") + } + + var args []string + args = append(args, "--target", strconv.Itoa(c.Target)) + + if c.Cgroup { + if c.CgroupFile != "" { + args = append(args, fmt.Sprintf("--cgroup=%s", c.CgroupFile)) + } else { + args = append(args, "--cgroup") + } + } + + if c.FollowContext { + args = append(args, "--follow-context") + } + + if c.GID != 0 { + args = append(args, "--setgid", strconv.Itoa(c.GID)) + } + + if c.IPC { + if c.IPCFile != "" { + args = append(args, fmt.Sprintf("--ip=%s", c.IPCFile)) + } else { + args = append(args, "--ipc") + } + } + + if c.Mount { + if c.MountFile != "" { + args = append(args, fmt.Sprintf("--mount=%s", c.MountFile)) + } else { + args = append(args, "--mount") + } + } + + if c.Net { + if c.NetFile != "" { + args = append(args, fmt.Sprintf("--net=%s", c.NetFile)) + } else { + args = append(args, "--net") + } + } + + if c.NoFork { + args = append(args, "--no-fork") + } + + if c.PID { + if c.PIDFile != "" { + args = append(args, fmt.Sprintf("--pid=%s", c.PIDFile)) + } else { + args = append(args, "--pid") + } + } + + if c.PreserveCredentials { + args = append(args, "--preserve-credentials") + } + + if c.RootDirectory != "" { + args = append(args, "--root", c.RootDirectory) + } + + if c.UID != 0 { + args = append(args, "--setuid", strconv.Itoa(c.UID)) + } + + if c.User { + if c.UserFile != "" { + args = append(args, fmt.Sprintf("--user=%s", c.UserFile)) + } else { + args = append(args, "--user") + } + } + + if c.UTS { + if c.UTSFile != "" { + args = append(args, fmt.Sprintf("--uts=%s", c.UTSFile)) + } else { + args = append(args, "--uts") + } + } + + if c.WorkingDirectory != "" { + args = append(args, "--wd", c.WorkingDirectory) + } + + cmd := exec.CommandContext(ctx, "nsenter", args...) + + return cmd, nil +} diff --git a/contrib/nydusify/pkg/commiter/util.go b/contrib/nydusify/pkg/commiter/util.go new file mode 100644 index 00000000000..ba41415abf8 --- /dev/null +++ b/contrib/nydusify/pkg/commiter/util.go @@ -0,0 +1,25 @@ +package commiter + +import ( + // "context" + // "fmt" + // "io" + // "strings" + "sync/atomic" + // "github.com/nydusaccelerator/nydus-cli/pkg/nsenter" + // "github.com/pkg/errors" + // "github.com/sirupsen/logrus" +) + +type Counter struct { + n int64 +} + +func (c *Counter) Write(p []byte) (n int, err error) { + atomic.AddInt64(&c.n, int64(len(p))) + return len(p), nil +} + +func (c *Counter) Size() (n int64) { + return c.n +} diff --git a/contrib/nydusify/pkg/utils/constant.go b/contrib/nydusify/pkg/utils/constant.go index 0c9694c0339..672fecca3ef 100644 --- a/contrib/nydusify/pkg/utils/constant.go +++ b/contrib/nydusify/pkg/utils/constant.go @@ -21,4 +21,7 @@ const ( LayerAnnotationNydusReferenceBlobIDs = "containerd.io/snapshot/nydus-reference-blob-ids" LayerAnnotationUncompressed = "containerd.io/uncompressed" + + LayerAnnotationNydusCommitBlobs = "containerd.io/snapshot/nydus-commit-blobs" + LayerAnnotationNydusBlobIDs = "containerd.io/snapshot/nydus-blob-ids" )