diff --git a/go.mod b/go.mod index 93af640cef..13ddbfb634 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.3 github.com/urfave/cli/v2 v2.25.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/bbolt v1.3.7 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/net v0.17.0 @@ -55,6 +56,8 @@ require ( k8s.io/utils v0.0.0-20230726121419-3b25d923346b ) +require github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + require ( github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect github.com/Microsoft/go-winio v0.6.0 // indirect diff --git a/go.sum b/go.sum index 5b08546849..f1bb81bb6d 100644 --- a/go.sum +++ b/go.sum @@ -344,6 +344,10 @@ github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlI github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= diff --git a/pkg/converter/convert_unix.go b/pkg/converter/convert_unix.go index 77ceedde27..bd53811d7f 100644 --- a/pkg/converter/convert_unix.go +++ b/pkg/converter/convert_unix.go @@ -359,10 +359,6 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser, } func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, builderPath, sourceDir string) error { - if opt.ExternalBlobWriter == nil { - return fmt.Errorf("the 'ExternalBlobWriter' option requires the 'AttributesPath' option be specified") - } - workDir, err := ensureWorkDir(opt.WorkDir) if err != nil { return errors.Wrap(err, "ensure work directory") @@ -380,12 +376,17 @@ func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, buil } defer blobFifo.Close() - externalBlobPath := filepath.Join(workDir, "external-blob") - externalBlobFifo, err := fifo.OpenFifo(ctx, externalBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640) - if err != nil { - return errors.Wrapf(err, "create fifo file for external blob") + externalBlobPath := "" + var externalBlobFifo io.ReadWriteCloser + if opt.ExternalBlobWriter != nil { + var err error + externalBlobPath = filepath.Join(workDir, "external-blob") + externalBlobFifo, err = fifo.OpenFifo(ctx, externalBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640) + if err != nil { + return errors.Wrapf(err, "create fifo file for external blob") + } + defer externalBlobFifo.Close() } - defer externalBlobFifo.Close() go func() { err := tool.Pack(tool.PackOption{ @@ -421,14 +422,17 @@ func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, buil } return nil }) - eg.Go(func() error { - buffer := bufPool.Get().(*[]byte) - defer bufPool.Put(buffer) - if _, err := io.CopyBuffer(opt.ExternalBlobWriter, externalBlobFifo, *buffer); err != nil { - return errors.Wrap(err, "pack to nydus external blob") - } - return nil - }) + + if opt.ExternalBlobWriter != nil { + eg.Go(func() error { + buffer := bufPool.Get().(*[]byte) + defer bufPool.Put(buffer) + if _, err := io.CopyBuffer(opt.ExternalBlobWriter, externalBlobFifo, *buffer); err != nil { + return errors.Wrap(err, "pack to nydus external blob") + } + return nil + }) + } return eg.Wait() } diff --git a/pkg/converter/tool/builder.go b/pkg/converter/tool/builder.go index 8051522f7e..11259c5fe0 100644 --- a/pkg/converter/tool/builder.go +++ b/pkg/converter/tool/builder.go @@ -76,7 +76,7 @@ type outputJSON struct { Blobs []string } -func buildPackArgs(option PackOption) []string { +func buildPackArgs(option PackOption) ([]string, error) { if option.FsVersion == "" { option.FsVersion = "6" } @@ -100,7 +100,11 @@ func buildPackArgs(option PackOption) []string { args, "--blob-inline-meta", ) - if option.AttributesPath != "" { + info, err := os.Stat(option.SourcePath) + if err != nil { + return nil, err + } + if info.IsDir() { args = append( args, "--type", @@ -160,7 +164,7 @@ func buildPackArgs(option PackOption) []string { } args = append(args, option.SourcePath) - return args + return args, nil } func Pack(option PackOption) error { @@ -175,7 +179,10 @@ func Pack(option PackOption) error { defer cancel() } - args := buildPackArgs(option) + args, err := buildPackArgs(option) + if err != nil { + return err + } logrus.Debugf("\tCommand: %s %s", option.BuilderPath, strings.Join(args, " ")) cmd := exec.CommandContext(ctx, option.BuilderPath, args...) diff --git a/pkg/external/backend/backend.go b/pkg/external/backend/backend.go new file mode 100644 index 0000000000..902da7bda3 --- /dev/null +++ b/pkg/external/backend/backend.go @@ -0,0 +1,59 @@ +package backend + +import "context" + +const ( + DefaultFileChunkSize = 1024 * 1024 * 1 // 1 MB + DefaultThrottleFileSize = 1024 * 1024 * 2 // 2 MB +) + +type Backend struct { + Type string `json:"type"` + Config map[string]string `json:"config"` +} + +type Result struct { + Chunks []Chunk + Files []string + Backend Backend +} + +type File struct { + RelativePath string + Size int64 +} + +// Handler is the interface for backend handler. +type Handler interface { + // Backend returns the backend information. + Backend(ctx context.Context) (*Backend, error) + // Handle handles the file and returns the object information. + Handle(ctx context.Context, file File) ([]Chunk, error) +} + +type Chunk interface { + ObjectID() uint32 + ObjectContent() interface{} + ObjectOffset() uint64 +} + +// SplitObjectOffsets splits the total size into object offsets +// with the specified chunk size. +func SplitObjectOffsets(totalSize, chunkSize int64) []uint64 { + objectOffsets := []uint64{} + if chunkSize <= 0 { + return objectOffsets + } + + chunkN := totalSize / chunkSize + + for i := int64(0); i < chunkN; i++ { + objectOffsets = append(objectOffsets, uint64(i*chunkSize)) + } + + if totalSize%chunkSize > 0 { + objectOffsets = append(objectOffsets, uint64(chunkN*chunkSize)) + } + + return objectOffsets +} diff --git a/pkg/external/backend/backend_test.go b/pkg/external/backend/backend_test.go new file mode 100644 index 0000000000..2b5bbcadd9 --- /dev/null +++ b/pkg/external/backend/backend_test.go @@ -0,0 +1,15 @@ +package backend + +import ( + "fmt" + "testing" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestLayout(t *testing.T) { + require.Equal(t, fmt.Sprintf("%d", 4096), fmt.Sprintf("%d", unsafe.Sizeof(Header{}))) + require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ChunkMeta{}))) + require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ObjectMeta{}))) +} diff --git a/pkg/external/backend/layout.go b/pkg/external/backend/layout.go new file mode 100644 index 0000000000..d534a78311 --- /dev/null +++ b/pkg/external/backend/layout.go @@ -0,0 +1,55 @@ +package backend + +const MetaMagic uint32 = 0x0AF5_E1E2 +const MetaVersion uint32 = 0x0000_0001 + +// Layout +// +// header: magic | version | chunk_meta_offset | object_meta_offset +// chunks: chunk_meta | chunk | chunk | ... +// objects: object_meta | [object_offsets] | object | object | ... + +// 4096 bytes +type Header struct { + Magic uint32 + Version uint32 + + ChunkMetaOffset uint32 + ObjectMetaOffset uint32 + + Reserved2 [4080]byte +} + +// 256 bytes +type ChunkMeta struct { + EntryCount uint32 + EntrySize uint32 + + Reserved [248]byte +} + +// 256 bytes +type ObjectMeta struct { + EntryCount uint32 + // = 0 means indeterminate entry size, and len(object_offsets) > 0. + // > 0 means fixed entry size, and len(object_offsets) == 0. + EntrySize uint32 + + Reserved [248]byte +} + +// 8 bytes +type ChunkOndisk struct { + ObjectIndex uint32 + Reserved [4]byte + ObjectOffset uint64 +} + +// 4 bytes +type ObjectOffset uint32 + +// Size depends on different external backend implementations +type ObjectOndisk struct { + EntrySize uint32 + EncodedData []byte +} diff --git a/pkg/external/backend/local/local.go b/pkg/external/backend/local/local.go new file mode 100644 index 0000000000..ca34eb2301 --- /dev/null +++ b/pkg/external/backend/local/local.go @@ -0,0 +1,68 @@ +package local + +import ( + "context" + + "github.com/containerd/nydus-snapshotter/pkg/external/backend" +) + +type object struct { + Path string `msgpack:"p"` +} + +type chunk struct { + objectID uint32 + objectContent object + objectOffset uint64 +} + +func (c *chunk) ObjectID() uint32 { + return c.objectID +} + +func (c *chunk) ObjectContent() interface{} { + return c.objectContent +} + +func (c *chunk) ObjectOffset() uint64 { + return c.objectOffset +} + +type Handler struct { + root string + objectID uint32 +} + +func NewHandler(root string) *Handler { + return &Handler{ + root: root, + objectID: 0, + } +} + +func (handler *Handler) Handle(_ context.Context, file backend.File) ([]backend.Chunk, error) { + chunks := []backend.Chunk{} + objectOffsets := backend.SplitObjectOffsets(file.Size, backend.DefaultFileChunkSize) + + for _, objectOffset := range objectOffsets { + chunks = append(chunks, &chunk{ + objectID: handler.objectID, + objectContent: object{ + Path: file.RelativePath, + }, + objectOffset: objectOffset, + }) + } + handler.objectID++ + + return chunks, nil +} + +func (handler *Handler) Backend(_ context.Context) (*backend.Backend, error) { + return &backend.Backend{ + Type: "local", + Config: map[string]string{ + "root": handler.root, + }, + }, nil +} diff --git a/pkg/external/backend/local/local_test.go b/pkg/external/backend/local/local_test.go new file mode 100644 index 0000000000..84377983ff --- /dev/null +++ b/pkg/external/backend/local/local_test.go @@ -0,0 +1,43 @@ +package local + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" +) + +type Object2 struct { + Kind string `msgpack:"k"` + Number uint64 `msgpack:"n"` + Path string `msgpack:"p"` +} + +func TestSerializeCompatibility(t *testing.T) { + object1 := object{ + Path: "test1", + } + object2 := Object2{} + buf, err := msgpack.Marshal(&object1) + require.NoError(t, err) + err = msgpack.Unmarshal(buf, &object2) + require.NoError(t, err) + require.Equal(t, object1.Path, object2.Path) + + object1 = object{} + object2 = Object2{ + Kind: "test2", + Number: 123, + Path: "test1", + } + object3 := Object2{} + buf, err = msgpack.Marshal(&object2) + require.NoError(t, err) + err = msgpack.Unmarshal(buf, &object1) + require.NoError(t, err) + require.Equal(t, object2.Path, object1.Path) + + err = msgpack.Unmarshal(buf, &object3) + require.NoError(t, err) + require.Equal(t, object2, object3) +} diff --git a/pkg/external/backend/walker.go b/pkg/external/backend/walker.go new file mode 100644 index 0000000000..e597fda6c4 --- /dev/null +++ b/pkg/external/backend/walker.go @@ -0,0 +1,112 @@ +package backend + +import ( + "context" + "io/fs" + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +type Walker struct { +} + +func NewWalker() *Walker { + return &Walker{} +} + +func bfsWalk(path string, fn func(string, fs.FileInfo) error) error { + info, err := os.Lstat(path) + if err != nil { + return err + } + + if info.IsDir() { + files, err := os.ReadDir(path) + if err != nil { + return err + } + + dirs := []string{} + for _, file := range files { + filePath := filepath.Join(path, file.Name()) + if file.Type().IsRegular() { + info, err := file.Info() + if err != nil { + return err + } + if err := fn(filePath, info); err != nil { + return err + } + } + if file.IsDir() { + dirs = append(dirs, filePath) + } + } + + for _, dir := range dirs { + if err := bfsWalk(dir, fn); err != nil { + return err + } + } + } + + return nil +} + +func (walker *Walker) Walk(ctx context.Context, root string, handler Handler) (*Result, error) { + chunks := []Chunk{} + files := []string{} + + addFile := func(size int64, relativeTarget string) error { + target := filepath.Join("/", relativeTarget) + _chunks, err := handler.Handle(ctx, File{ + RelativePath: relativeTarget, + Size: size, + }) + if err != nil { + return err + } + chunks = append(chunks, _chunks...) + files = append(files, target) + return nil + } + + walkFiles := []func() error{} + + if err := bfsWalk(root, func(path string, info fs.FileInfo) error { + if info.Size() < DefaultThrottleFileSize { + return nil + } + + target, err := filepath.Rel(root, path) + if err != nil { + return err + } + walkFiles = append(walkFiles, func() error { + return addFile(info.Size(), target) + }) + + return nil + }); err != nil { + return nil, errors.Wrap(err, "walk directory") + } + + for i := 0; i < len(walkFiles); i++ { + if err := walkFiles[i](); err != nil { + return nil, errors.Wrap(err, "handle files") + } + } + + bkd, err := handler.Backend(ctx) + if err != nil { + return nil, err + } + + return &Result{ + Chunks: chunks, + Files: files, + Backend: *bkd, + }, nil +} diff --git a/pkg/external/external.go b/pkg/external/external.go new file mode 100644 index 0000000000..6a9775eb2f --- /dev/null +++ b/pkg/external/external.go @@ -0,0 +1,71 @@ +package external + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + + "github.com/containerd/nydus-snapshotter/pkg/external/backend" + "github.com/pkg/errors" +) + +type Options struct { + Dir string + Handler backend.Handler + MetaOutput string + BackendOutput string + AttributesOutput string +} + +type Attribute struct { + Pattern string +} + +// Handle handles the directory and generates the backend meta and attributes. +func Handle(ctx context.Context, opts Options) error { + walker := backend.NewWalker() + + backendRet, err := walker.Walk(ctx, opts.Dir, opts.Handler) + if err != nil { + return err + } + generators, err := NewGenerators(*backendRet) + if err != nil { + return err + } + ret, err := generators.Generate() + if err != nil { + return err + } + bkd := ret.Backend + attributes := []Attribute{} + for _, file := range ret.Files { + attributes = append(attributes, Attribute{ + Pattern: file, + }) + } + + if err := os.WriteFile(opts.MetaOutput, ret.Meta, 0644); err != nil { + return errors.Wrapf(err, "write meta to %s", opts.MetaOutput) + } + + backendBytes, err := json.MarshalIndent(bkd, "", " ") + if err != nil { + return err + } + if err := os.WriteFile(opts.BackendOutput, backendBytes, 0644); err != nil { + return errors.Wrapf(err, "write backend json to %s", opts.BackendOutput) + } + + attributeContent := []string{} + for _, attribute := range attributes { + attributeContent = append(attributeContent, fmt.Sprintf("%s\ttype=external", attribute.Pattern)) + } + if err := os.WriteFile(opts.AttributesOutput, []byte(strings.Join(attributeContent, "\n")), 0644); err != nil { + return errors.Wrapf(err, "write attributes to %s", opts.BackendOutput) + } + + return nil +} diff --git a/pkg/external/generator.go b/pkg/external/generator.go new file mode 100644 index 0000000000..ce54a8e4bb --- /dev/null +++ b/pkg/external/generator.go @@ -0,0 +1,149 @@ +package external + +import ( + "bytes" + "encoding/binary" + "unsafe" + + "github.com/containerd/nydus-snapshotter/pkg/external/backend" + "github.com/pkg/errors" + "github.com/vmihailenco/msgpack/v5" +) + +type Result struct { + Meta []byte + Backend backend.Backend + Files []string +} + +type MetaGenerator struct { + backend.Header + backend.ChunkMeta + Chunks []backend.ChunkOndisk + backend.ObjectMeta + ObjectOffsets []backend.ObjectOffset + Objects []backend.ObjectOndisk +} + +type Generator interface { + Generate() error +} + +type Generators struct { + MetaGenerator + Backend backend.Backend + Files []string +} + +func NewGenerators(ret backend.Result) (*Generators, error) { + objects := []backend.ObjectOndisk{} + chunks := []backend.ChunkOndisk{} + objectMap := make(map[uint32]uint32) // object id -> object index + + for _, chunk := range ret.Chunks { + objectID := chunk.ObjectID() + objectIndex, ok := objectMap[objectID] + if !ok { + objectIndex = uint32(len(objects)) + objectMap[objectID] = objectIndex + encoded, err := msgpack.Marshal(chunk.ObjectContent()) + if err != nil { + return nil, errors.Wrap(err, "encode to msgpack format") + } + objects = append(objects, backend.ObjectOndisk{ + EntrySize: uint32(len(encoded)), + EncodedData: encoded[:], + }) + } + chunks = append(chunks, backend.ChunkOndisk{ + ObjectIndex: objectIndex, + ObjectOffset: chunk.ObjectOffset(), + }) + } + + return &Generators{ + MetaGenerator: MetaGenerator{ + Chunks: chunks, + Objects: objects, + }, + Backend: ret.Backend, + Files: ret.Files, + }, nil +} + +func (generators *Generators) Generate() (*Result, error) { + meta, err := generators.MetaGenerator.Generate() + if err != nil { + return nil, errors.Wrap(err, "generate backend meta") + } + return &Result{ + Meta: meta, + Backend: generators.Backend, + Files: generators.Files, + }, nil +} + +func (generator *MetaGenerator) Generate() ([]byte, error) { + // prepare data + chunkMetaOffset := uint32(unsafe.Sizeof(generator.Header)) + generator.ChunkMeta.EntryCount = uint32(len(generator.Chunks)) + generator.ChunkMeta.EntrySize = uint32(unsafe.Sizeof(backend.ChunkOndisk{})) + objectMetaOffset := chunkMetaOffset + uint32(unsafe.Sizeof(generator.ChunkMeta)) + generator.ChunkMeta.EntryCount*generator.ChunkMeta.EntrySize + generator.Header = backend.Header{ + Magic: backend.MetaMagic, + Version: backend.MetaVersion, + ChunkMetaOffset: chunkMetaOffset, + ObjectMetaOffset: objectMetaOffset, + } + + generator.ObjectMeta.EntryCount = uint32(len(generator.Objects)) + objectOffsets := []backend.ObjectOffset{} + objectOffset := backend.ObjectOffset(objectMetaOffset + uint32(unsafe.Sizeof(generator.ObjectMeta)) + 4*generator.ObjectMeta.EntryCount) + var lastEntrySize uint32 + fixedEntrySize := true + for _, object := range generator.Objects { + if lastEntrySize > 0 && lastEntrySize != object.EntrySize { + fixedEntrySize = false + } + lastEntrySize = object.EntrySize + objectOffsets = append(objectOffsets, objectOffset) + objectOffset += backend.ObjectOffset(uint32(unsafe.Sizeof(object.EntrySize)) + uint32(len(object.EncodedData))) + } + if fixedEntrySize && len(generator.Objects) > 0 { + generator.ObjectMeta.EntrySize = generator.Objects[0].EntrySize + } + generator.ObjectOffsets = objectOffsets + + // dump bytes + var buf bytes.Buffer + + if err := binary.Write(&buf, binary.LittleEndian, generator.Header); err != nil { + return nil, errors.Wrap(err, "dump") + } + if err := binary.Write(&buf, binary.LittleEndian, generator.ChunkMeta); err != nil { + return nil, errors.Wrap(err, "dump") + } + for _, chunk := range generator.Chunks { + if err := binary.Write(&buf, binary.LittleEndian, chunk); err != nil { + return nil, errors.Wrap(err, "dump") + } + } + if err := binary.Write(&buf, binary.LittleEndian, generator.ObjectMeta); err != nil { + return nil, errors.Wrap(err, "dump") + } + for _, objectOffset := range generator.ObjectOffsets { + if err := binary.Write(&buf, binary.LittleEndian, objectOffset); err != nil { + return nil, errors.Wrap(err, "dump") + } + } + for _, object := range generator.Objects { + if err := binary.Write(&buf, binary.LittleEndian, object.EntrySize); err != nil { + return nil, errors.Wrap(err, "dump") + } + if err := binary.Write(&buf, binary.LittleEndian, object.EncodedData); err != nil { + return nil, errors.Wrap(err, "dump") + } + } + + return buf.Bytes(), nil +}