Skip to content

Commit

Permalink
feat: split input file to chunks with specified redundancy (#4600)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Mar 4, 2024
1 parent be9a15b commit f7b3586
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 32 deletions.
165 changes: 135 additions & 30 deletions cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,62 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/cobra"
)

// putter is a putter that stores all the split chunk addresses of a file
type putter struct {
chunkAddresses []string
cb func(chunk swarm.Chunk) error
}

func (s *putter) Put(ctx context.Context, chunk swarm.Chunk) error {
s.chunkAddresses = append(s.chunkAddresses, chunk.Address().String())
return nil
func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
return s.cb(chunk)
}
func newPutter(cb func(ch swarm.Chunk) error) *putter {
return &putter{
cb: cb,
}
}

var _ storage.Putter = (*putter)(nil)

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool) pipelineFunc {
func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, 0)
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

func (c *command) initSplitCmd() error {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
cmd := &cobra.Command{
Use: "split",
Short: "Split a file into a list chunks. The 1st line is the root hash",
Short: "Split a file into chunks",
}

splitRefs(cmd)
splitChunks(cmd)
c.root.AddCommand(cmd)
return nil
}

func splitRefs(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputFile := "output-file"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "refs",
Short: "Write only the chunk reference to the output file",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
Expand All @@ -53,6 +74,10 @@ func (c *command) initSplitCmd() error {
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}

v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
Expand All @@ -70,44 +95,124 @@ func (c *command) initSplitCmd() error {
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName)
store := new(putter)

p := requestPipelineFn(store, false)
address, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("bmt pipeline: %w", err)
}

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "file", outputFileName)

var refs []string
store := newPutter(func(ch swarm.Chunk) error {
refs = append(refs, ch.Address().String())
return nil
})
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open output file: %w", err)
}
defer writer.Close()

logger.Debug("write root", "hash", address)
_, err = writer.WriteString(fmt.Sprintf("%s\n", address))
p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}

logger.Debug("write root", "hash", rootRef)
_, err = writer.WriteString(fmt.Sprintf("%s\n", rootRef))
if err != nil {
return fmt.Errorf("write root hash: %w", err)
}
for _, chunkAddress := range store.chunkAddresses {
logger.Debug("write chunk", "hash", chunkAddress)
_, err = writer.WriteString(fmt.Sprintf("%s\n", chunkAddress))
for _, ref := range refs {
logger.Debug("write chunk", "hash", ref)
_, err = writer.WriteString(fmt.Sprintf("%s\n", ref))
if err != nil {
return fmt.Errorf("write chunk address: %w", err)
}
}
logger.Info("done", "hashes", len(store.chunkAddresses))
logger.Info("done", "root", rootRef.String(), "chunks", len(refs))
return nil
},
}

cmd.Flags().String(optionNameVerbosity, "info", "verbosity level")
cmd.Flags().String(optionNameInputFile, "", "input file")
cmd.Flags().String(optionNameOutputFile, "", "output file")
cmd.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputFile, "", "output file")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")

c.root.AddCommand(cmd)
return nil
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputFile)

cmd.AddCommand(c)
}

func splitChunks(cmd *cobra.Command) {
optionNameInputFile := "input-file"
optionNameOutputDir := "output-dir"
optionNameRedundancyLevel := "r-level"

c := &cobra.Command{
Use: "chunks",
Short: "Write the chunks to the output directory",
RunE: func(cmd *cobra.Command, args []string) error {
inputFileName, err := cmd.Flags().GetString(optionNameInputFile)
if err != nil {
return fmt.Errorf("get input file name: %w", err)
}
outputDir, err := cmd.Flags().GetString(optionNameOutputDir)
if err != nil {
return fmt.Errorf("get output file name: %w", err)
}
info, err := os.Stat(outputDir)
if err != nil {
return fmt.Errorf("stat output dir: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("output dir %s is not a directory", outputDir)
}
rLevel, err := cmd.Flags().GetInt(optionNameRedundancyLevel)
if err != nil {
return fmt.Errorf("get redundancy level: %w", err)
}
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
reader, err := os.Open(inputFileName)
if err != nil {
return fmt.Errorf("open input file: %w", err)
}
defer reader.Close()

logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "dir", outputDir)

var chunksCount atomic.Int64
store := newPutter(func(chunk swarm.Chunk) error {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
if err != nil {
return err
}
chunksCount.Add(1)
return nil
})

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}
logger.Info("done", "root", rootRef.String(), "chunks", chunksCount.Load())
return nil
},
}
c.Flags().String(optionNameInputFile, "", "input file")
c.Flags().String(optionNameOutputDir, "", "output dir")
c.Flags().Int(optionNameRedundancyLevel, 0, "redundancy level")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.MarkFlagsRequiredTogether(optionNameInputFile, optionNameOutputDir)

cmd.AddCommand(c)
}
116 changes: 114 additions & 2 deletions cmd/bee/cmd/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ package cmd_test

import (
"bufio"
"bytes"
"context"
crand "crypto/rand"
"io"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"

"github.com/ethersphere/bee/cmd/bee/cmd"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

func TestDBSplit(t *testing.T) {
func TestDBSplitRefs(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
Expand All @@ -34,7 +43,7 @@ func TestDBSplit(t *testing.T) {

outputFileName := path.Join(t.TempDir(), "output")

err = newCommand(t, cmd.WithArgs("split", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
err = newCommand(t, cmd.WithArgs("split", "refs", "--input-file", inputFileName, "--output-file", outputFileName)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,3 +69,106 @@ func TestDBSplit(t *testing.T) {
t.Fatalf("got %d hashes, want %d", gotHashes, wantHashes)
}
}

func TestDBSplitChunks(t *testing.T) {
t.Parallel()

s := (rand.Intn(10) + 10) * 1024 // rand between 10 and 20 KB
buf := make([]byte, s)
_, err := crand.Read(buf)
if err != nil {
t.Fatal(err)
}

inputFileName := path.Join(t.TempDir(), "input")
err = os.WriteFile(inputFileName, buf, 0644)
if err != nil {
t.Fatal(err)
}

dir := path.Join(t.TempDir(), "chunks")
err = os.Mkdir(dir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

err = newCommand(t, cmd.WithArgs("split", "chunks", "--input-file", inputFileName, "--output-dir", dir, "--r-level", "3")).Execute()
if err != nil {
t.Fatal(err)
}

// split the file manually and compare output with the split commands output.
putter := &putter{chunks: make(map[string]swarm.Chunk)}
p := requestPipelineFn(putter, false, redundancy.Level(3))
_, err = p(context.Background(), bytes.NewReader(buf))
if err != nil {
t.Fatal(err)
}

entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}

if len(entries) != len(putter.chunks) {
t.Fatal("number of chunks does not match")
}
for _, entry := range entries {
ref := entry.Name()
if _, ok := putter.chunks[ref]; !ok {
t.Fatalf("chunk %s not found", ref)
}
err, ok := compare(filepath.Join(dir, ref), putter.chunks[ref])
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatalf("chunk %s does not match", ref)
}
delete(putter.chunks, ref)
}

if len(putter.chunks) != 0 {
t.Fatalf("want 0 chunks left, got %d", len(putter.chunks))
}
}

func compare(path string, chunk swarm.Chunk) (error, bool) {
f, err := os.Open(path)
if err != nil {
return err, false
}
defer f.Close()

b, err := io.ReadAll(f)
if err != nil {
return err, false
}

if !bytes.Equal(b, chunk.Data()) {
return nil, false
}

return nil, true
}

type putter struct {
chunks map[string]swarm.Chunk
mu sync.Mutex
}

func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
s.mu.Lock()
defer s.mu.Unlock()
s.chunks[chunk.Address().String()] = chunk
return nil
}

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
return func(ctx context.Context, r io.Reader) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, encrypt, rLevel)
return builder.FeedPipeline(ctx, pipe, r)
}
}

0 comments on commit f7b3586

Please sign in to comment.