diff --git a/code/go/0chain.net/blobbercore/allocation/newfilechange.go b/code/go/0chain.net/blobbercore/allocation/newfilechange.go index 280e2cd6d..346ea89fb 100644 --- a/code/go/0chain.net/blobbercore/allocation/newfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/newfilechange.go @@ -32,6 +32,15 @@ type NewFileChange struct { EncryptedKey string `json:"encrypted_key,omitempty"` CustomMeta string `json:"custom_meta,omitempty"` Attributes reference.Attributes `json:"attributes,omitempty"` + + // IsResumable the request is resumable upload + IsResumable bool `json:"is_resumable,omitempty"` + // UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 `json:"upload_length,omitempty"` + // Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 `json:"upload_offset,omitempty"` + // IsFinal the request is final chunk + IsFinal bool `json:"is_final,omitempty"` } func (nf *NewFileChange) ProcessChange(ctx context.Context, diff --git a/code/go/0chain.net/blobbercore/filestore/chunk_writer.go b/code/go/0chain.net/blobbercore/filestore/chunk_writer.go new file mode 100644 index 000000000..e1e027f03 --- /dev/null +++ b/code/go/0chain.net/blobbercore/filestore/chunk_writer.go @@ -0,0 +1,115 @@ +package filestore + +import ( + "context" + "errors" + "io" + "os" +) + +//ChunkWriter implements a chunk write that will append content to the file +type ChunkWriter struct { + file string + writer *os.File + reader *os.File + offset int64 + size int64 +} + +//NewChunkWriter create a ChunkWriter +func NewChunkWriter(file string) (*ChunkWriter, error) { + w := &ChunkWriter{ + file: file, + } + var f *os.File + fi, err := os.Stat(file) + if errors.Is(err, os.ErrNotExist) { + f, err = os.Create(file) + if err != nil { + return nil, err + } + } else { + f, err = os.OpenFile(file, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, os.ModeAppend) + if err != nil { + return nil, err + } + + w.size = fi.Size() + w.offset = fi.Size() + } + + w.writer = f + + return w, nil +} + +//Write implements io.Writer +func (w *ChunkWriter) Write(b []byte) (n int, err error) { + if w == nil || w.writer == nil { + return 0, os.ErrNotExist + } + + written, err := w.writer.Write(b) + + w.size += int64(written) + + return written, err +} + +//Reader implements io.Reader +func (w *ChunkWriter) Read(p []byte) (n int, err error) { + if w == nil || w.reader == nil { + reader, err := os.Open(w.file) + + if err != nil { + return 0, err + } + + w.reader = reader + } + + return w.reader.Read(p) +} + +//WriteChunk append data to the file +func (w *ChunkWriter) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + if w == nil || w.writer == nil { + return 0, os.ErrNotExist + } + + _, err := w.writer.Seek(offset, io.SeekStart) + + if err != nil { + return 0, err + } + + n, err := io.Copy(w.writer, src) + + w.offset += n + w.size += n + + return n, err +} + +//Size length in bytes for regular files +func (w *ChunkWriter) Size() int64 { + if w == nil { + return 0 + } + return w.size +} + +//Close closes the underline File +func (w *ChunkWriter) Close() { + if w == nil { + return + } + + if w.writer != nil { + w.writer.Close() + } + + if w.reader != nil { + w.reader.Close() + } +} diff --git a/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go new file mode 100644 index 000000000..716dc3189 --- /dev/null +++ b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go @@ -0,0 +1,93 @@ +package filestore + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWrite(t *testing.T) { + + fileName := filepath.Join(os.TempDir(), "testwrite_"+strconv.FormatInt(time.Now().Unix(), 10)) + + content := "this is full content" + + w, err := NewChunkWriter(fileName) + if err != nil { + require.Error(t, err, "failed to create ChunkWriter") + return + } + + _, err = w.Write([]byte(content)) + + if err != nil { + require.Error(t, err, "failed to ChunkWriter.WriteChunk") + return + } + + buf := make([]byte, w.Size()) + + //read all lines from file + _, err = w.Read(buf) + if err != nil { + require.Error(t, err, "failed to ChunkWriter.Read") + return + } + + assert.Equal(t, content, string(buf), "File content should be same") +} + +func TestWriteChunk(t *testing.T) { + + chunk1 := "this is 1st chunked" + + tempFile, err := ioutil.TempFile("", "") + + if err != nil { + require.Error(t, err, "failed to create tempfile") + return + } + offset, err := tempFile.Write([]byte(chunk1)) + if err != nil { + require.Error(t, err, "failed to write first chunk to tempfile") + return + } + + fileName := tempFile.Name() + tempFile.Close() + + w, err := NewChunkWriter(fileName) + if err != nil { + require.Error(t, err, "failed to create ChunkWriter") + return + } + defer w.Close() + + chunk2 := "this is 2nd chunk" + + _, err = w.WriteChunk(context.TODO(), int64(offset), strings.NewReader(chunk2)) + + if err != nil { + require.Error(t, err, "failed to ChunkWriter.WriteChunk") + return + } + + buf := make([]byte, w.Size()) + + //read all lines from file + _, err = w.Read(buf) + if err != nil { + require.Error(t, err, "failed to ChunkWriter.Read") + return + } + + assert.Equal(t, chunk1+chunk2, string(buf), "File content should be same") +} diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index edb41a286..9ce06d8d6 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -2,12 +2,14 @@ package filestore import ( "bytes" + "context" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "hash" "io" + "io/ioutil" "mime/multipart" "os" "path/filepath" @@ -481,21 +483,43 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, return nil, common.NewError("filestore_setup_error", "Error setting the fs store. "+err.Error()) } - h := sha1.New() tempFilePath := fs.generateTempPath(allocation, fileData, connectionID) - dest, err := os.Create(tempFilePath) + dest, err := NewChunkWriter(tempFilePath) if err != nil { return nil, common.NewError("file_creation_error", err.Error()) } defer dest.Close() - // infile, err := hdr.Open() - // if err != nil { - // return nil, common.NewError("file_reading_error", err.Error()) - // } + + fileRef := &FileOutputData{} + var fileReader io.Reader = infile + + if fileData.IsResumable { + h := sha1.New() + offset, err := dest.WriteChunk(context.TODO(), fileData.UploadOffset, io.TeeReader(fileReader, h)) + + if err != nil { + return nil, common.NewError("file_write_error", err.Error()) + } + + fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + fileRef.Size = dest.Size() + fileRef.Name = fileData.Name + fileRef.Path = fileData.Path + fileRef.UploadOffset = fileData.UploadOffset + offset + fileRef.UploadLength = fileData.UploadLength + + if !fileData.IsFinal { + //skip to compute hash until the last chunk is uploaded + return fileRef, nil + } + + fileReader = dest + } + + h := sha1.New() bytesBuffer := bytes.NewBuffer(nil) - //merkleHash := sha3.New256() multiHashWriter := io.MultiWriter(h, bytesBuffer) - tReader := io.TeeReader(infile, multiHashWriter) + tReader := io.TeeReader(fileReader, multiHashWriter) merkleHashes := make([]hash.Hash, 1024) merkleLeaves := make([]util.Hashable, 1024) for idx := range merkleHashes { @@ -503,7 +527,15 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, } fileSize := int64(0) for { - written, err := io.CopyN(dest, tReader, CHUNK_SIZE) + var written int64 + + if fileData.IsResumable { + //all chunks have been written, only read bytes from local file , and compute hash + written, err = io.CopyN(ioutil.Discard, tReader, CHUNK_SIZE) + } else { + written, err = io.CopyN(dest, tReader, CHUNK_SIZE) + } + if err != io.EOF && err != nil { return nil, common.NewError("file_write_error", err.Error()) } @@ -519,7 +551,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, merkleHashes[offset].Write(dataBytes[i:end]) } - // merkleLeaves = append(merkleLeaves, util.NewStringHashable(hex.EncodeToString(merkleHash.Sum(nil)))) bytesBuffer.Reset() if err != nil && err == io.EOF { break @@ -528,17 +559,21 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, for idx := range merkleHashes { merkleLeaves[idx] = util.NewStringHashable(hex.EncodeToString(merkleHashes[idx].Sum(nil))) } - //Logger.Info("File size", zap.Int64("file_size", fileSize)) + var mt util.MerkleTreeI = &util.MerkleTree{} mt.ComputeTree(merkleLeaves) - //Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves))) - fileRef := &FileOutputData{} - fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + //only update hash for whole file when it is not a resumable upload or is final chunk. + if !fileData.IsResumable || fileData.IsFinal { + fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + } + fileRef.Size = fileSize fileRef.Name = fileData.Name fileRef.Path = fileData.Path fileRef.MerkleRoot = mt.GetRoot() + fileRef.UploadOffset = fileSize + fileRef.UploadLength = fileData.UploadLength return fileRef, nil } diff --git a/code/go/0chain.net/blobbercore/filestore/store.go b/code/go/0chain.net/blobbercore/filestore/store.go index 3e2b718fa..6980b4346 100644 --- a/code/go/0chain.net/blobbercore/filestore/store.go +++ b/code/go/0chain.net/blobbercore/filestore/store.go @@ -14,6 +14,15 @@ type FileInputData struct { Path string Hash string OnCloud bool + + //IsResumable the request is resumable upload + IsResumable bool + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 + //IsFinal the request is final chunk + IsFinal bool } type FileOutputData struct { @@ -22,6 +31,11 @@ type FileOutputData struct { MerkleRoot string ContentHash string Size int64 + + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 } type FileObjectHandler func(contentHash string, contentSize int64) diff --git a/code/go/0chain.net/blobbercore/handler/dto.go b/code/go/0chain.net/blobbercore/handler/dto.go index 2edcf2b26..98bc27aee 100644 --- a/code/go/0chain.net/blobbercore/handler/dto.go +++ b/code/go/0chain.net/blobbercore/handler/dto.go @@ -12,6 +12,11 @@ type UploadResult struct { Size int64 `json:"size"` Hash string `json:"content_hash"` MerkleRoot string `json:"merkle_root"` + + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 `json:"upload_length"` + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 `json:"upload_offset"` } type CommitResult struct {