Skip to content
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/newfilechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ type NewFileChange struct {
EncryptedKey string `json:"encrypted_key,omitempty"`
CustomMeta string `json:"custom_meta,omitempty"`
Attributes reference.Attributes `json:"attributes,omitempty"`

// IsResumable the request is resumable upload
IsResumable bool `json:"is_resumable,omitempty"`
// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length,omitempty"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset,omitempty"`
// IsFinal the request is final chunk
IsFinal bool `json:"is_final,omitempty"`
}

func (nf *NewFileChange) ProcessChange(ctx context.Context,
Expand Down
115 changes: 115 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/chunk_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package filestore

import (
"context"
"errors"
"io"
"os"
)

//ChunkWriter implements a chunk write that will append content to the file
type ChunkWriter struct {
file string
writer *os.File
reader *os.File
offset int64
size int64
}

//NewChunkWriter create a ChunkWriter
func NewChunkWriter(file string) (*ChunkWriter, error) {
w := &ChunkWriter{
file: file,
}
var f *os.File
fi, err := os.Stat(file)
if errors.Is(err, os.ErrNotExist) {
f, err = os.Create(file)
if err != nil {
return nil, err
}
} else {
f, err = os.OpenFile(file, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, os.ModeAppend)
if err != nil {
return nil, err
}

w.size = fi.Size()
w.offset = fi.Size()
}

w.writer = f

return w, nil
}

//Write implements io.Writer
func (w *ChunkWriter) Write(b []byte) (n int, err error) {
if w == nil || w.writer == nil {
return 0, os.ErrNotExist
}

written, err := w.writer.Write(b)

w.size += int64(written)

return written, err
}

//Reader implements io.Reader
func (w *ChunkWriter) Read(p []byte) (n int, err error) {
if w == nil || w.reader == nil {
reader, err := os.Open(w.file)

if err != nil {
return 0, err
}

w.reader = reader
}

return w.reader.Read(p)
}

//WriteChunk append data to the file
func (w *ChunkWriter) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
if w == nil || w.writer == nil {
return 0, os.ErrNotExist
}

_, err := w.writer.Seek(offset, io.SeekStart)

if err != nil {
return 0, err
}

n, err := io.Copy(w.writer, src)

w.offset += n
w.size += n

return n, err
}

//Size length in bytes for regular files
func (w *ChunkWriter) Size() int64 {
if w == nil {
return 0
}
return w.size
}

//Close closes the underline File
func (w *ChunkWriter) Close() {
if w == nil {
return
}

if w.writer != nil {
w.writer.Close()
}

if w.reader != nil {
w.reader.Close()
}
}
93 changes: 93 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package filestore

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWrite(t *testing.T) {

fileName := filepath.Join(os.TempDir(), "testwrite_"+strconv.FormatInt(time.Now().Unix(), 10))

content := "this is full content"

w, err := NewChunkWriter(fileName)
if err != nil {
require.Error(t, err, "failed to create ChunkWriter")
return
}

_, err = w.Write([]byte(content))

if err != nil {
require.Error(t, err, "failed to ChunkWriter.WriteChunk")
return
}

buf := make([]byte, w.Size())

//read all lines from file
_, err = w.Read(buf)
if err != nil {
require.Error(t, err, "failed to ChunkWriter.Read")
return
}

assert.Equal(t, content, string(buf), "File content should be same")
}

func TestWriteChunk(t *testing.T) {

chunk1 := "this is 1st chunked"

tempFile, err := ioutil.TempFile("", "")

if err != nil {
require.Error(t, err, "failed to create tempfile")
return
}
offset, err := tempFile.Write([]byte(chunk1))
if err != nil {
require.Error(t, err, "failed to write first chunk to tempfile")
return
}

fileName := tempFile.Name()
tempFile.Close()

w, err := NewChunkWriter(fileName)
if err != nil {
require.Error(t, err, "failed to create ChunkWriter")
return
}
defer w.Close()

chunk2 := "this is 2nd chunk"

_, err = w.WriteChunk(context.TODO(), int64(offset), strings.NewReader(chunk2))

if err != nil {
require.Error(t, err, "failed to ChunkWriter.WriteChunk")
return
}

buf := make([]byte, w.Size())

//read all lines from file
_, err = w.Read(buf)
if err != nil {
require.Error(t, err, "failed to ChunkWriter.Read")
return
}

assert.Equal(t, chunk1+chunk2, string(buf), "File content should be same")
}
63 changes: 49 additions & 14 deletions code/go/0chain.net/blobbercore/filestore/fs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package filestore

import (
"bytes"
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
"io/ioutil"
"mime/multipart"
"os"
"path/filepath"
Expand Down Expand Up @@ -481,29 +483,59 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData,
return nil, common.NewError("filestore_setup_error", "Error setting the fs store. "+err.Error())
}

h := sha1.New()
tempFilePath := fs.generateTempPath(allocation, fileData, connectionID)
dest, err := os.Create(tempFilePath)
dest, err := NewChunkWriter(tempFilePath)
if err != nil {
return nil, common.NewError("file_creation_error", err.Error())
}
defer dest.Close()
// infile, err := hdr.Open()
// if err != nil {
// return nil, common.NewError("file_reading_error", err.Error())
// }

fileRef := &FileOutputData{}
var fileReader io.Reader = infile

if fileData.IsResumable {
h := sha1.New()
offset, err := dest.WriteChunk(context.TODO(), fileData.UploadOffset, io.TeeReader(fileReader, h))

if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}

fileRef.ContentHash = hex.EncodeToString(h.Sum(nil))
fileRef.Size = dest.Size()
fileRef.Name = fileData.Name
fileRef.Path = fileData.Path
fileRef.UploadOffset = fileData.UploadOffset + offset
fileRef.UploadLength = fileData.UploadLength

if !fileData.IsFinal {
//skip to compute hash until the last chunk is uploaded
return fileRef, nil
}

fileReader = dest
}

h := sha1.New()
bytesBuffer := bytes.NewBuffer(nil)
//merkleHash := sha3.New256()
multiHashWriter := io.MultiWriter(h, bytesBuffer)
tReader := io.TeeReader(infile, multiHashWriter)
tReader := io.TeeReader(fileReader, multiHashWriter)
merkleHashes := make([]hash.Hash, 1024)
merkleLeaves := make([]util.Hashable, 1024)
for idx := range merkleHashes {
merkleHashes[idx] = sha3.New256()
}
fileSize := int64(0)
for {
written, err := io.CopyN(dest, tReader, CHUNK_SIZE)
var written int64

if fileData.IsResumable {
//all chunks have been written, only read bytes from local file , and compute hash
written, err = io.CopyN(ioutil.Discard, tReader, CHUNK_SIZE)
} else {
written, err = io.CopyN(dest, tReader, CHUNK_SIZE)
}

if err != io.EOF && err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
Expand All @@ -519,7 +551,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData,
merkleHashes[offset].Write(dataBytes[i:end])
}

// merkleLeaves = append(merkleLeaves, util.NewStringHashable(hex.EncodeToString(merkleHash.Sum(nil))))
bytesBuffer.Reset()
if err != nil && err == io.EOF {
break
Expand All @@ -528,17 +559,21 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData,
for idx := range merkleHashes {
merkleLeaves[idx] = util.NewStringHashable(hex.EncodeToString(merkleHashes[idx].Sum(nil)))
}
//Logger.Info("File size", zap.Int64("file_size", fileSize))

var mt util.MerkleTreeI = &util.MerkleTree{}
mt.ComputeTree(merkleLeaves)
//Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves)))

fileRef := &FileOutputData{}
fileRef.ContentHash = hex.EncodeToString(h.Sum(nil))
//only update hash for whole file when it is not a resumable upload or is final chunk.
if !fileData.IsResumable || fileData.IsFinal {
fileRef.ContentHash = hex.EncodeToString(h.Sum(nil))
}

fileRef.Size = fileSize
fileRef.Name = fileData.Name
fileRef.Path = fileData.Path
fileRef.MerkleRoot = mt.GetRoot()
fileRef.UploadOffset = fileSize
fileRef.UploadLength = fileData.UploadLength

return fileRef, nil
}
Expand Down
14 changes: 14 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ type FileInputData struct {
Path string
Hash string
OnCloud bool

//IsResumable the request is resumable upload
IsResumable bool
//UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
//IsFinal the request is final chunk
IsFinal bool
}

type FileOutputData struct {
Expand All @@ -22,6 +31,11 @@ type FileOutputData struct {
MerkleRoot string
ContentHash string
Size int64

//UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
}

type FileObjectHandler func(contentHash string, contentSize int64)
Expand Down
5 changes: 5 additions & 0 deletions code/go/0chain.net/blobbercore/handler/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type UploadResult struct {
Size int64 `json:"size"`
Hash string `json:"content_hash"`
MerkleRoot string `json:"merkle_root"`

//UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
}

type CommitResult struct {
Expand Down