Skip to content

Commit

Permalink
introduce external package
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Song <yansong.ys@antgroup.com>
  • Loading branch information
imeoer committed Mar 1, 2024
1 parent b319212 commit 86935c1
Show file tree
Hide file tree
Showing 12 changed files with 611 additions and 21 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.3
github.com/urfave/cli/v2 v2.25.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.etcd.io/bbolt v1.3.7
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/net v0.17.0
Expand All @@ -55,6 +56,8 @@ require (
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

require github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect

require (
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlI
github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down
38 changes: 21 additions & 17 deletions pkg/converter/convert_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,6 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser,
}

func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, builderPath, sourceDir string) error {
if opt.ExternalBlobWriter == nil {
return fmt.Errorf("the 'ExternalBlobWriter' option requires the 'AttributesPath' option be specified")
}

workDir, err := ensureWorkDir(opt.WorkDir)
if err != nil {
return errors.Wrap(err, "ensure work directory")
Expand All @@ -380,12 +376,17 @@ func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, buil
}
defer blobFifo.Close()

externalBlobPath := filepath.Join(workDir, "external-blob")
externalBlobFifo, err := fifo.OpenFifo(ctx, externalBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640)
if err != nil {
return errors.Wrapf(err, "create fifo file for external blob")
externalBlobPath := ""
var externalBlobFifo io.ReadWriteCloser
if opt.ExternalBlobWriter != nil {
var err error
externalBlobPath = filepath.Join(workDir, "external-blob")
externalBlobFifo, err = fifo.OpenFifo(ctx, externalBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640)
if err != nil {
return errors.Wrapf(err, "create fifo file for external blob")
}
defer externalBlobFifo.Close()
}
defer externalBlobFifo.Close()

go func() {
err := tool.Pack(tool.PackOption{
Expand Down Expand Up @@ -421,14 +422,17 @@ func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, buil
}
return nil
})
eg.Go(func() error {
buffer := bufPool.Get().(*[]byte)
defer bufPool.Put(buffer)
if _, err := io.CopyBuffer(opt.ExternalBlobWriter, externalBlobFifo, *buffer); err != nil {
return errors.Wrap(err, "pack to nydus external blob")
}
return nil
})

if opt.ExternalBlobWriter != nil {
eg.Go(func() error {
buffer := bufPool.Get().(*[]byte)
defer bufPool.Put(buffer)
if _, err := io.CopyBuffer(opt.ExternalBlobWriter, externalBlobFifo, *buffer); err != nil {
return errors.Wrap(err, "pack to nydus external blob")
}
return nil
})
}

return eg.Wait()
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/converter/tool/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type outputJSON struct {
Blobs []string
}

func buildPackArgs(option PackOption) []string {
func buildPackArgs(option PackOption) ([]string, error) {
if option.FsVersion == "" {
option.FsVersion = "6"
}
Expand All @@ -100,7 +100,11 @@ func buildPackArgs(option PackOption) []string {
args,
"--blob-inline-meta",
)
if option.AttributesPath != "" {
info, err := os.Stat(option.SourcePath)
if err != nil {
return nil, err
}
if info.IsDir() {
args = append(
args,
"--type",
Expand Down Expand Up @@ -160,7 +164,7 @@ func buildPackArgs(option PackOption) []string {
}
args = append(args, option.SourcePath)

return args
return args, nil
}

func Pack(option PackOption) error {
Expand All @@ -175,7 +179,10 @@ func Pack(option PackOption) error {
defer cancel()
}

args := buildPackArgs(option)
args, err := buildPackArgs(option)
if err != nil {
return err
}
logrus.Debugf("\tCommand: %s %s", option.BuilderPath, strings.Join(args, " "))

cmd := exec.CommandContext(ctx, option.BuilderPath, args...)
Expand Down
59 changes: 59 additions & 0 deletions pkg/external/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package backend

import "context"

const (
DefaultFileChunkSize = 1024 * 1024 * 1 // 1 MB
DefaultThrottleFileSize = 1024 * 1024 * 2 // 2 MB
)

type Backend struct {
Type string `json:"type"`
Config map[string]string `json:"config"`
}

type Result struct {
Chunks []Chunk
Files []string
Backend Backend
}

type File struct {
RelativePath string
Size int64
}

// Handler is the interface for backend handler.
type Handler interface {
// Backend returns the backend information.
Backend(ctx context.Context) (*Backend, error)
// Handle handles the file and returns the object information.
Handle(ctx context.Context, file File) ([]Chunk, error)
}

type Chunk interface {
ObjectID() uint32
ObjectContent() interface{}
ObjectOffset() uint64
}

// SplitObjectOffsets splits the total size into object offsets
// with the specified chunk size.
func SplitObjectOffsets(totalSize, chunkSize int64) []uint64 {
objectOffsets := []uint64{}
if chunkSize <= 0 {
return objectOffsets
}

chunkN := totalSize / chunkSize

for i := int64(0); i < chunkN; i++ {
objectOffsets = append(objectOffsets, uint64(i*chunkSize))
}

if totalSize%chunkSize > 0 {
objectOffsets = append(objectOffsets, uint64(chunkN*chunkSize))
}

return objectOffsets
}
15 changes: 15 additions & 0 deletions pkg/external/backend/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package backend

import (
"fmt"
"testing"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestLayout(t *testing.T) {
require.Equal(t, fmt.Sprintf("%d", 4096), fmt.Sprintf("%d", unsafe.Sizeof(Header{})))
require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ChunkMeta{})))
require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ObjectMeta{})))
}
55 changes: 55 additions & 0 deletions pkg/external/backend/layout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package backend

const MetaMagic uint32 = 0x0AF5_E1E2
const MetaVersion uint32 = 0x0000_0001

// Layout
//
// header: magic | version | chunk_meta_offset | object_meta_offset
// chunks: chunk_meta | chunk | chunk | ...
// objects: object_meta | [object_offsets] | object | object | ...

// 4096 bytes
type Header struct {
Magic uint32
Version uint32

ChunkMetaOffset uint32
ObjectMetaOffset uint32

Reserved2 [4080]byte
}

// 256 bytes
type ChunkMeta struct {
EntryCount uint32
EntrySize uint32

Reserved [248]byte
}

// 256 bytes
type ObjectMeta struct {
EntryCount uint32
// = 0 means indeterminate entry size, and len(object_offsets) > 0.
// > 0 means fixed entry size, and len(object_offsets) == 0.
EntrySize uint32

Reserved [248]byte
}

// 8 bytes
type ChunkOndisk struct {
ObjectIndex uint32
Reserved [4]byte
ObjectOffset uint64
}

// 4 bytes
type ObjectOffset uint32

// Size depends on different external backend implementations
type ObjectOndisk struct {
EntrySize uint32
EncodedData []byte
}
68 changes: 68 additions & 0 deletions pkg/external/backend/local/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package local

import (
"context"

"github.com/containerd/nydus-snapshotter/pkg/external/backend"
)

type object struct {
Path string `msgpack:"p"`
}

type chunk struct {
objectID uint32
objectContent object
objectOffset uint64
}

func (c *chunk) ObjectID() uint32 {
return c.objectID
}

func (c *chunk) ObjectContent() interface{} {
return c.objectContent
}

func (c *chunk) ObjectOffset() uint64 {
return c.objectOffset
}

type Handler struct {
root string
objectID uint32
}

func NewHandler(root string) *Handler {
return &Handler{
root: root,
objectID: 0,
}
}

func (handler *Handler) Handle(_ context.Context, file backend.File) ([]backend.Chunk, error) {
chunks := []backend.Chunk{}
objectOffsets := backend.SplitObjectOffsets(file.Size, backend.DefaultFileChunkSize)

for _, objectOffset := range objectOffsets {
chunks = append(chunks, &chunk{
objectID: handler.objectID,
objectContent: object{
Path: file.RelativePath,
},
objectOffset: objectOffset,
})
}
handler.objectID++

return chunks, nil
}

func (handler *Handler) Backend(_ context.Context) (*backend.Backend, error) {
return &backend.Backend{
Type: "local",
Config: map[string]string{
"root": handler.root,
},
}, nil
}
43 changes: 43 additions & 0 deletions pkg/external/backend/local/local_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package local

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/vmihailenco/msgpack/v5"
)

type Object2 struct {
Kind string `msgpack:"k"`
Number uint64 `msgpack:"n"`
Path string `msgpack:"p"`
}

func TestSerializeCompatibility(t *testing.T) {
object1 := object{
Path: "test1",
}
object2 := Object2{}
buf, err := msgpack.Marshal(&object1)
require.NoError(t, err)
err = msgpack.Unmarshal(buf, &object2)
require.NoError(t, err)
require.Equal(t, object1.Path, object2.Path)

object1 = object{}
object2 = Object2{
Kind: "test2",
Number: 123,
Path: "test1",
}
object3 := Object2{}
buf, err = msgpack.Marshal(&object2)
require.NoError(t, err)
err = msgpack.Unmarshal(buf, &object1)
require.NoError(t, err)
require.Equal(t, object2.Path, object1.Path)

err = msgpack.Unmarshal(buf, &object3)
require.NoError(t, err)
require.Equal(t, object2, object3)
}
Loading

0 comments on commit 86935c1

Please sign in to comment.