From e3e86290635ae865b6988ab7a25cf8712cb7caaa Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 8 May 2021 15:45:18 +0800 Subject: [PATCH 1/9] fix(upload):added ChunkWriter feature on upload. #111 --- .../blobbercore/allocation/newfilechange.go | 27 ++-- .../blobbercore/filestore/chunk_writer.go | 115 ++++++++++++++++++ .../filestore/chunk_writer_test.go | 92 ++++++++++++++ .../blobbercore/filestore/fs_store.go | 53 +++++++- .../0chain.net/blobbercore/filestore/store.go | 14 +++ code/go/0chain.net/blobbercore/handler/dto.go | 5 + .../handler/object_operation_handler.go | 8 +- 7 files changed, 296 insertions(+), 18 deletions(-) create mode 100644 code/go/0chain.net/blobbercore/filestore/chunk_writer.go create mode 100644 code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go diff --git a/code/go/0chain.net/blobbercore/allocation/newfilechange.go b/code/go/0chain.net/blobbercore/allocation/newfilechange.go index 280e2cd6d..123ac8a0b 100644 --- a/code/go/0chain.net/blobbercore/allocation/newfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/newfilechange.go @@ -14,24 +14,33 @@ import ( ) type NewFileChange struct { - ConnectionID string `json:"connection_id" validation:"required"` - AllocationID string `json:"allocation_id"` - Filename string `json:"filename" validation:"required"` - ThumbnailFilename string `json:"thumbnail_filename"` - Path string `json:"filepath" validation:"required"` - Size int64 `json:"size"` + ConnectionID string `json:"connection_id,omitempty" validation:"required"` + AllocationID string `json:"allocation_id,omitempty"` + Filename string `json:"filename,omitempty" validation:"required"` + ThumbnailFilename string `json:"thumbnail_filename,omitempty"` + Path string `json:"filepath,omitempty" validation:"required"` + Size int64 `json:"size,omitempty"` Hash string `json:"content_hash,omitempty"` - ThumbnailSize int64 `json:"thumbnail_size"` + ThumbnailSize int64 `json:"thumbnail_size,omitempty"` ThumbnailHash string `json:"thumbnail_content_hash,omitempty"` MerkleRoot string `json:"merkle_root,omitempty"` ActualHash string `json:"actual_hash,omitempty" validation:"required"` ActualSize int64 `json:"actual_size,omitempty" validation:"required"` - ActualThumbnailSize int64 `json:"actual_thumb_size"` - ActualThumbnailHash string `json:"actual_thumb_hash"` + ActualThumbnailSize int64 `json:"actual_thumb_size,omitempty"` + ActualThumbnailHash string `json:"actual_thumb_hash,omitempty"` MimeType string `json:"mimetype,omitempty"` EncryptedKey string `json:"encrypted_key,omitempty"` CustomMeta string `json:"custom_meta,omitempty"` Attributes reference.Attributes `json:"attributes,omitempty"` + + //IsResumable the request is resumable upload + IsResumable bool `json:"is_resumable,omitempty"` + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 `json:"upload_length,omitempty"` + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 `json:"upload_offset,omitempty"` + //IsFinal the request is final chunk + IsFinal bool `json:"is_final,omitempty"` } func (nf *NewFileChange) ProcessChange(ctx context.Context, diff --git a/code/go/0chain.net/blobbercore/filestore/chunk_writer.go b/code/go/0chain.net/blobbercore/filestore/chunk_writer.go new file mode 100644 index 000000000..e1e027f03 --- /dev/null +++ b/code/go/0chain.net/blobbercore/filestore/chunk_writer.go @@ -0,0 +1,115 @@ +package filestore + +import ( + "context" + "errors" + "io" + "os" +) + +//ChunkWriter implements a chunk write that will append content to the file +type ChunkWriter struct { + file string + writer *os.File + reader *os.File + offset int64 + size int64 +} + +//NewChunkWriter create a ChunkWriter +func NewChunkWriter(file string) (*ChunkWriter, error) { + w := &ChunkWriter{ + file: file, + } + var f *os.File + fi, err := os.Stat(file) + if errors.Is(err, os.ErrNotExist) { + f, err = os.Create(file) + if err != nil { + return nil, err + } + } else { + f, err = os.OpenFile(file, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, os.ModeAppend) + if err != nil { + return nil, err + } + + w.size = fi.Size() + w.offset = fi.Size() + } + + w.writer = f + + return w, nil +} + +//Write implements io.Writer +func (w *ChunkWriter) Write(b []byte) (n int, err error) { + if w == nil || w.writer == nil { + return 0, os.ErrNotExist + } + + written, err := w.writer.Write(b) + + w.size += int64(written) + + return written, err +} + +//Reader implements io.Reader +func (w *ChunkWriter) Read(p []byte) (n int, err error) { + if w == nil || w.reader == nil { + reader, err := os.Open(w.file) + + if err != nil { + return 0, err + } + + w.reader = reader + } + + return w.reader.Read(p) +} + +//WriteChunk append data to the file +func (w *ChunkWriter) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + if w == nil || w.writer == nil { + return 0, os.ErrNotExist + } + + _, err := w.writer.Seek(offset, io.SeekStart) + + if err != nil { + return 0, err + } + + n, err := io.Copy(w.writer, src) + + w.offset += n + w.size += n + + return n, err +} + +//Size length in bytes for regular files +func (w *ChunkWriter) Size() int64 { + if w == nil { + return 0 + } + return w.size +} + +//Close closes the underline File +func (w *ChunkWriter) Close() { + if w == nil { + return + } + + if w.writer != nil { + w.writer.Close() + } + + if w.reader != nil { + w.reader.Close() + } +} diff --git a/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go new file mode 100644 index 000000000..621a5f54e --- /dev/null +++ b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go @@ -0,0 +1,92 @@ +package filestore + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWrite(t *testing.T) { + + fileName := filepath.Join(os.TempDir(), "testwrite_"+strconv.FormatInt(time.Now().Unix(), 10)) + + content := "this is full content" + + w, err := NewChunkWriter(fileName) + if err != nil { + assert.Fail(t, "failed to create ChunkWriter", err) + return + } + + _, err = w.Write([]byte(content)) + + if err != nil { + assert.Fail(t, "failed to ChunkWriter.WriteChunk") + return + } + + buf := make([]byte, w.Size()) + + //read all lines from file + _, err = w.Read(buf) + if err != nil { + assert.Fail(t, "failed to ChunkWriter.Read", err) + return + } + + assert.Equal(t, content, string(buf), "File content should be same") +} + +func TestWriteChunk(t *testing.T) { + + chunk1 := "this is 1st chunked" + + tempFile, err := ioutil.TempFile("", "") + + if err != nil { + assert.Fail(t, "failed to create tempfile") + return + } + offset, err := tempFile.Write([]byte(chunk1)) + if err != nil { + assert.Fail(t, "failed to write first chunk to tempfile") + return + } + + fileName := tempFile.Name() + tempFile.Close() + + w, err := NewChunkWriter(fileName) + if err != nil { + assert.Fail(t, "failed to create ChunkWriter") + return + } + defer w.Close() + + chunk2 := "this is 2nd chunk" + + _, err = w.WriteChunk(context.TODO(), int64(offset), strings.NewReader(chunk2)) + + if err != nil { + assert.Fail(t, "failed to ChunkWriter.WriteChunk") + return + } + + buf := make([]byte, w.Size()) + + //read all lines from file + _, err = w.Read(buf) + if err != nil { + assert.Fail(t, "failed to ChunkWriter.Read", err) + return + } + + assert.Equal(t, chunk1+chunk2, string(buf), "File content should be same") +} diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index edb41a286..4a72c0e81 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -2,12 +2,14 @@ package filestore import ( "bytes" + "context" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "hash" "io" + "io/ioutil" "mime/multipart" "os" "path/filepath" @@ -481,21 +483,52 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, return nil, common.NewError("filestore_setup_error", "Error setting the fs store. "+err.Error()) } - h := sha1.New() tempFilePath := fs.generateTempPath(allocation, fileData, connectionID) - dest, err := os.Create(tempFilePath) + //dest, err := os.Create(tempFilePath) + dest, err := NewChunkWriter(tempFilePath) if err != nil { return nil, common.NewError("file_creation_error", err.Error()) } defer dest.Close() + // infile, err := hdr.Open() // if err != nil { // return nil, common.NewError("file_reading_error", err.Error()) // } + + fileRef := &FileOutputData{} + + var fileReader io.Reader = infile + + if fileData.IsResumable { + h := sha1.New() + offset, err := dest.WriteChunk(context.TODO(), fileData.UploadOffset, io.TeeReader(fileReader, h)) + + if err != nil { + return nil, common.NewError("file_write_error", err.Error()) + } + + fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + fileRef.Size = dest.Size() + fileRef.Name = fileData.Name + fileRef.Path = fileData.Path + fileRef.UploadOffset = fileData.UploadOffset + offset + fileRef.UploadLength = fileData.UploadLength + + if !fileData.IsFinal { + //skip to compute hash until the last chunk is uploaded + return fileRef, nil + } + + fileReader = dest + } + + h := sha1.New() + bytesBuffer := bytes.NewBuffer(nil) //merkleHash := sha3.New256() multiHashWriter := io.MultiWriter(h, bytesBuffer) - tReader := io.TeeReader(infile, multiHashWriter) + tReader := io.TeeReader(fileReader, multiHashWriter) merkleHashes := make([]hash.Hash, 1024) merkleLeaves := make([]util.Hashable, 1024) for idx := range merkleHashes { @@ -503,7 +536,15 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, } fileSize := int64(0) for { - written, err := io.CopyN(dest, tReader, CHUNK_SIZE) + var written int64 + + if fileData.IsResumable { + //all chunks have been written, only read bytes from local file , and compute hash + written, err = io.CopyN(ioutil.Discard, tReader, CHUNK_SIZE) + } else { + written, err = io.CopyN(dest, tReader, CHUNK_SIZE) + } + if err != io.EOF && err != nil { return nil, common.NewError("file_write_error", err.Error()) } @@ -533,13 +574,15 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, mt.ComputeTree(merkleLeaves) //Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves))) - fileRef := &FileOutputData{} fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) fileRef.Size = fileSize fileRef.Name = fileData.Name fileRef.Path = fileData.Path fileRef.MerkleRoot = mt.GetRoot() + fileRef.UploadOffset = fileSize + fileRef.UploadLength = fileData.UploadLength + return fileRef, nil } diff --git a/code/go/0chain.net/blobbercore/filestore/store.go b/code/go/0chain.net/blobbercore/filestore/store.go index 3e2b718fa..6980b4346 100644 --- a/code/go/0chain.net/blobbercore/filestore/store.go +++ b/code/go/0chain.net/blobbercore/filestore/store.go @@ -14,6 +14,15 @@ type FileInputData struct { Path string Hash string OnCloud bool + + //IsResumable the request is resumable upload + IsResumable bool + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 + //IsFinal the request is final chunk + IsFinal bool } type FileOutputData struct { @@ -22,6 +31,11 @@ type FileOutputData struct { MerkleRoot string ContentHash string Size int64 + + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 } type FileObjectHandler func(contentHash string, contentSize int64) diff --git a/code/go/0chain.net/blobbercore/handler/dto.go b/code/go/0chain.net/blobbercore/handler/dto.go index 2edcf2b26..bf09c0ce7 100644 --- a/code/go/0chain.net/blobbercore/handler/dto.go +++ b/code/go/0chain.net/blobbercore/handler/dto.go @@ -12,6 +12,11 @@ type UploadResult struct { Size int64 `json:"size"` Hash string `json:"content_hash"` MerkleRoot string `json:"merkle_root"` + + //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 + //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 } type CommitResult struct { diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 59e6ddd26..24abfcf2e 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -970,7 +970,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl "Invalid parameters. Error parsing the meta data for upload."+err.Error()) } exisitingFileRef := fsh.checkIfFileAlreadyExists(ctx, allocationID, formData.Path) - existingFileRefSize := int64(0) + exisitingFileRefSize := int64(0) exisitingFileOnCloud := false if mode == allocation.INSERT_OPERATION { if allocationObj.OwnerID != clientID && allocationObj.PayerID != clientID { @@ -993,7 +993,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl } if exisitingFileRef != nil { - existingFileRefSize = exisitingFileRef.Size + exisitingFileRefSize = exisitingFileRef.Size exisitingFileOnCloud = exisitingFileRef.OnCloud } @@ -1051,13 +1051,13 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl formData.ThumbnailFilename = thumbInputData.Name } - if allocationObj.BlobberSizeUsed+(allocationSize-existingFileRefSize) > allocationObj.BlobberSize { + if allocationObj.BlobberSizeUsed+(allocationSize-exisitingFileRefSize) > allocationObj.BlobberSize { return nil, common.NewError("max_allocation_size", "Max size reached for the allocation with this blobber") } allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ConnectionID - allocationChange.Size = allocationSize - existingFileRefSize + allocationChange.Size = allocationSize - exisitingFileRefSize allocationChange.Operation = mode connectionObj.Size += allocationChange.Size From c032c4c9b93063d75e9dc1e1325f105aa81b5093 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 8 May 2021 15:55:27 +0800 Subject: [PATCH 2/9] fix(upload):#111 Updated json tags on UploadResult and NewFileChange --- .../blobbercore/allocation/newfilechange.go | 18 +++++++++--------- code/go/0chain.net/blobbercore/handler/dto.go | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/newfilechange.go b/code/go/0chain.net/blobbercore/allocation/newfilechange.go index 123ac8a0b..7a41b2049 100644 --- a/code/go/0chain.net/blobbercore/allocation/newfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/newfilechange.go @@ -14,20 +14,20 @@ import ( ) type NewFileChange struct { - ConnectionID string `json:"connection_id,omitempty" validation:"required"` - AllocationID string `json:"allocation_id,omitempty"` - Filename string `json:"filename,omitempty" validation:"required"` - ThumbnailFilename string `json:"thumbnail_filename,omitempty"` - Path string `json:"filepath,omitempty" validation:"required"` - Size int64 `json:"size,omitempty"` + ConnectionID string `json:"connection_id" validation:"required"` + AllocationID string `json:"allocation_id"` + Filename string `json:"filename" validation:"required"` + ThumbnailFilename string `json:"thumbnail_filename"` + Path string `json:"filepath" validation:"required"` + Size int64 `json:"size"` Hash string `json:"content_hash,omitempty"` - ThumbnailSize int64 `json:"thumbnail_size,omitempty"` + ThumbnailSize int64 `json:"thumbnail_size"` ThumbnailHash string `json:"thumbnail_content_hash,omitempty"` MerkleRoot string `json:"merkle_root,omitempty"` ActualHash string `json:"actual_hash,omitempty" validation:"required"` ActualSize int64 `json:"actual_size,omitempty" validation:"required"` - ActualThumbnailSize int64 `json:"actual_thumb_size,omitempty"` - ActualThumbnailHash string `json:"actual_thumb_hash,omitempty"` + ActualThumbnailSize int64 `json:"actual_thumb_size"` + ActualThumbnailHash string `json:"actual_thumb_hash"` MimeType string `json:"mimetype,omitempty"` EncryptedKey string `json:"encrypted_key,omitempty"` CustomMeta string `json:"custom_meta,omitempty"` diff --git a/code/go/0chain.net/blobbercore/handler/dto.go b/code/go/0chain.net/blobbercore/handler/dto.go index bf09c0ce7..98bc27aee 100644 --- a/code/go/0chain.net/blobbercore/handler/dto.go +++ b/code/go/0chain.net/blobbercore/handler/dto.go @@ -14,9 +14,9 @@ type UploadResult struct { MerkleRoot string `json:"merkle_root"` //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. - UploadLength int64 + UploadLength int64 `json:"upload_length"` //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. - UploadOffset int64 + UploadOffset int64 `json:"upload_offset"` } type CommitResult struct { From 82267ac531731b441907a0933808d7ca65755438 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 10 May 2021 11:07:21 +0800 Subject: [PATCH 3/9] fix(upload):#111 update ContextHash for final chunk upload --- code/go/0chain.net/blobbercore/filestore/fs_store.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index 4a72c0e81..877af342f 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -574,7 +574,11 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, mt.ComputeTree(merkleLeaves) //Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves))) - fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + //only update hash for whole file when it is not a resumable upload. + if !fileData.IsResumable { + fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) + } + fileRef.Size = fileSize fileRef.Name = fileData.Name fileRef.Path = fileData.Path From 6dd18b9c45923546cbc890f3310258c7e8931a9b Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 10 May 2021 21:18:59 +0800 Subject: [PATCH 4/9] fix(upload):#111 use whole file hash as reqquest hash whent it is final chunk --- code/go/0chain.net/blobbercore/filestore/fs_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index 877af342f..010f7e061 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -574,8 +574,8 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, mt.ComputeTree(merkleLeaves) //Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves))) - //only update hash for whole file when it is not a resumable upload. - if !fileData.IsResumable { + //only update hash for whole file when it is not a resumable upload or is final chunk. + if !fileData.IsResumable || fileData.IsFinal { fileRef.ContentHash = hex.EncodeToString(h.Sum(nil)) } From 1f7d9b7b2fdd0eae764126eada3ab4e68c800f01 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 10 May 2021 21:55:04 +0800 Subject: [PATCH 5/9] fix(upload):#111 pass resume parameter to FileInputData --- .../blobbercore/handler/object_operation_handler.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 24abfcf2e..85b1d87b5 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -1010,7 +1010,16 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl defer thumbfile.Close() } - fileInputData := &filestore.FileInputData{Name: formData.Filename, Path: formData.Path, OnCloud: exisitingFileOnCloud} + fileInputData := &filestore.FileInputData{ + Name: formData.Filename, + Path: formData.Path, + OnCloud: exisitingFileOnCloud, + + IsResumable: formData.IsResumable, + UploadOffset: formData.UploadOffset, + UploadLength: formData.UploadLength, + IsFinal: formData.IsFinal, + } fileOutputData, err := filestore.GetFileStore().WriteFile(allocationID, fileInputData, origfile, connectionObj.ConnectionID) if err != nil { return nil, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) From 14a3e55e1335a9c491e4c99df4fe454a3a5f9963 Mon Sep 17 00:00:00 2001 From: Lz Date: Thu, 13 May 2021 06:37:26 +0800 Subject: [PATCH 6/9] fix(upload):#111 format some code for review --- .../blobbercore/allocation/newfilechange.go | 8 ++++---- .../blobbercore/filestore/chunk_writer_test.go | 17 +++++++++-------- .../blobbercore/filestore/fs_store.go | 13 +++---------- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/newfilechange.go b/code/go/0chain.net/blobbercore/allocation/newfilechange.go index 7a41b2049..346ea89fb 100644 --- a/code/go/0chain.net/blobbercore/allocation/newfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/newfilechange.go @@ -33,13 +33,13 @@ type NewFileChange struct { CustomMeta string `json:"custom_meta,omitempty"` Attributes reference.Attributes `json:"attributes,omitempty"` - //IsResumable the request is resumable upload + // IsResumable the request is resumable upload IsResumable bool `json:"is_resumable,omitempty"` - //UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + // UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. UploadLength int64 `json:"upload_length,omitempty"` - //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + // Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. UploadOffset int64 `json:"upload_offset,omitempty"` - //IsFinal the request is final chunk + // IsFinal the request is final chunk IsFinal bool `json:"is_final,omitempty"` } diff --git a/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go index 621a5f54e..716dc3189 100644 --- a/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go +++ b/code/go/0chain.net/blobbercore/filestore/chunk_writer_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWrite(t *testing.T) { @@ -21,14 +22,14 @@ func TestWrite(t *testing.T) { w, err := NewChunkWriter(fileName) if err != nil { - assert.Fail(t, "failed to create ChunkWriter", err) + require.Error(t, err, "failed to create ChunkWriter") return } _, err = w.Write([]byte(content)) if err != nil { - assert.Fail(t, "failed to ChunkWriter.WriteChunk") + require.Error(t, err, "failed to ChunkWriter.WriteChunk") return } @@ -37,7 +38,7 @@ func TestWrite(t *testing.T) { //read all lines from file _, err = w.Read(buf) if err != nil { - assert.Fail(t, "failed to ChunkWriter.Read", err) + require.Error(t, err, "failed to ChunkWriter.Read") return } @@ -51,12 +52,12 @@ func TestWriteChunk(t *testing.T) { tempFile, err := ioutil.TempFile("", "") if err != nil { - assert.Fail(t, "failed to create tempfile") + require.Error(t, err, "failed to create tempfile") return } offset, err := tempFile.Write([]byte(chunk1)) if err != nil { - assert.Fail(t, "failed to write first chunk to tempfile") + require.Error(t, err, "failed to write first chunk to tempfile") return } @@ -65,7 +66,7 @@ func TestWriteChunk(t *testing.T) { w, err := NewChunkWriter(fileName) if err != nil { - assert.Fail(t, "failed to create ChunkWriter") + require.Error(t, err, "failed to create ChunkWriter") return } defer w.Close() @@ -75,7 +76,7 @@ func TestWriteChunk(t *testing.T) { _, err = w.WriteChunk(context.TODO(), int64(offset), strings.NewReader(chunk2)) if err != nil { - assert.Fail(t, "failed to ChunkWriter.WriteChunk") + require.Error(t, err, "failed to ChunkWriter.WriteChunk") return } @@ -84,7 +85,7 @@ func TestWriteChunk(t *testing.T) { //read all lines from file _, err = w.Read(buf) if err != nil { - assert.Fail(t, "failed to ChunkWriter.Read", err) + require.Error(t, err, "failed to ChunkWriter.Read") return } diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index 010f7e061..9033e5585 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -484,18 +484,13 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, } tempFilePath := fs.generateTempPath(allocation, fileData, connectionID) - //dest, err := os.Create(tempFilePath) + dest, err := NewChunkWriter(tempFilePath) if err != nil { return nil, common.NewError("file_creation_error", err.Error()) } defer dest.Close() - // infile, err := hdr.Open() - // if err != nil { - // return nil, common.NewError("file_reading_error", err.Error()) - // } - fileRef := &FileOutputData{} var fileReader io.Reader = infile @@ -526,7 +521,7 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, h := sha1.New() bytesBuffer := bytes.NewBuffer(nil) - //merkleHash := sha3.New256() + multiHashWriter := io.MultiWriter(h, bytesBuffer) tReader := io.TeeReader(fileReader, multiHashWriter) merkleHashes := make([]hash.Hash, 1024) @@ -560,7 +555,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, merkleHashes[offset].Write(dataBytes[i:end]) } - // merkleLeaves = append(merkleLeaves, util.NewStringHashable(hex.EncodeToString(merkleHash.Sum(nil)))) bytesBuffer.Reset() if err != nil && err == io.EOF { break @@ -569,10 +563,9 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, for idx := range merkleHashes { merkleLeaves[idx] = util.NewStringHashable(hex.EncodeToString(merkleHashes[idx].Sum(nil))) } - //Logger.Info("File size", zap.Int64("file_size", fileSize)) + var mt util.MerkleTreeI = &util.MerkleTree{} mt.ComputeTree(merkleLeaves) - //Logger.Info("Calculated Merkle root", zap.String("merkle_root", mt.GetRoot()), zap.Int("merkle_leaf_count", len(merkleLeaves))) //only update hash for whole file when it is not a resumable upload or is final chunk. if !fileData.IsResumable || fileData.IsFinal { From 00d794bbc1a20e8647cd8df2eec825c94b9f7b10 Mon Sep 17 00:00:00 2001 From: Lz Date: Thu, 13 May 2021 06:52:23 +0800 Subject: [PATCH 7/9] fix(upload):#111 just removed some blank lines --- code/go/0chain.net/blobbercore/filestore/fs_store.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/code/go/0chain.net/blobbercore/filestore/fs_store.go b/code/go/0chain.net/blobbercore/filestore/fs_store.go index 9033e5585..9ce06d8d6 100644 --- a/code/go/0chain.net/blobbercore/filestore/fs_store.go +++ b/code/go/0chain.net/blobbercore/filestore/fs_store.go @@ -484,7 +484,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, } tempFilePath := fs.generateTempPath(allocation, fileData, connectionID) - dest, err := NewChunkWriter(tempFilePath) if err != nil { return nil, common.NewError("file_creation_error", err.Error()) @@ -492,7 +491,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, defer dest.Close() fileRef := &FileOutputData{} - var fileReader io.Reader = infile if fileData.IsResumable { @@ -519,9 +517,7 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, } h := sha1.New() - bytesBuffer := bytes.NewBuffer(nil) - multiHashWriter := io.MultiWriter(h, bytesBuffer) tReader := io.TeeReader(fileReader, multiHashWriter) merkleHashes := make([]hash.Hash, 1024) @@ -576,7 +572,6 @@ func (fs *FileFSStore) WriteFile(allocationID string, fileData *FileInputData, fileRef.Name = fileData.Name fileRef.Path = fileData.Path fileRef.MerkleRoot = mt.GetRoot() - fileRef.UploadOffset = fileSize fileRef.UploadLength = fileData.UploadLength From afe8b1c9e0d4809190986d9243e98d50a311ff26 Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 14 May 2021 11:21:20 +0800 Subject: [PATCH 8/9] fix(upload):#111 revert typo mistake --- .../handler/object_operation_handler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 85b1d87b5..def111d90 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -969,7 +969,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl return nil, common.NewError("invalid_parameters", "Invalid parameters. Error parsing the meta data for upload."+err.Error()) } - exisitingFileRef := fsh.checkIfFileAlreadyExists(ctx, allocationID, formData.Path) + existingFileRefSize := fsh.checkIfFileAlreadyExists(ctx, allocationID, formData.Path) exisitingFileRefSize := int64(0) exisitingFileOnCloud := false if mode == allocation.INSERT_OPERATION { @@ -977,24 +977,24 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation") } - if exisitingFileRef != nil { + if existingFileRefSize != nil { return nil, common.NewError("duplicate_file", "File at path already exists") } } else if mode == allocation.UPDATE_OPERATION { - if exisitingFileRef == nil { + if existingFileRefSize == nil { return nil, common.NewError("invalid_file_update", "File at path does not exist for update") } if allocationObj.OwnerID != clientID && allocationObj.PayerID != clientID && - !reference.IsACollaborator(ctx, exisitingFileRef.ID, clientID) { + !reference.IsACollaborator(ctx, existingFileRefSize.ID, clientID) { return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner, collaborator or the payer of the allocation") } } - if exisitingFileRef != nil { - exisitingFileRefSize = exisitingFileRef.Size - exisitingFileOnCloud = exisitingFileRef.OnCloud + if existingFileRefSize != nil { + exisitingFileRefSize = existingFileRefSize.Size + exisitingFileOnCloud = existingFileRefSize.OnCloud } origfile, _, err := r.FormFile("uploadFile") From b2eb7a57c56e068588e0e3a6ae2cbd89a2b11ecf Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 14 May 2021 11:23:20 +0800 Subject: [PATCH 9/9] fix(upload):#111 revert typo mistake --- .../handler/object_operation_handler.go | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index def111d90..59e6ddd26 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -969,32 +969,32 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl return nil, common.NewError("invalid_parameters", "Invalid parameters. Error parsing the meta data for upload."+err.Error()) } - existingFileRefSize := fsh.checkIfFileAlreadyExists(ctx, allocationID, formData.Path) - exisitingFileRefSize := int64(0) + exisitingFileRef := fsh.checkIfFileAlreadyExists(ctx, allocationID, formData.Path) + existingFileRefSize := int64(0) exisitingFileOnCloud := false if mode == allocation.INSERT_OPERATION { if allocationObj.OwnerID != clientID && allocationObj.PayerID != clientID { return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation") } - if existingFileRefSize != nil { + if exisitingFileRef != nil { return nil, common.NewError("duplicate_file", "File at path already exists") } } else if mode == allocation.UPDATE_OPERATION { - if existingFileRefSize == nil { + if exisitingFileRef == nil { return nil, common.NewError("invalid_file_update", "File at path does not exist for update") } if allocationObj.OwnerID != clientID && allocationObj.PayerID != clientID && - !reference.IsACollaborator(ctx, existingFileRefSize.ID, clientID) { + !reference.IsACollaborator(ctx, exisitingFileRef.ID, clientID) { return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner, collaborator or the payer of the allocation") } } - if existingFileRefSize != nil { - exisitingFileRefSize = existingFileRefSize.Size - exisitingFileOnCloud = existingFileRefSize.OnCloud + if exisitingFileRef != nil { + existingFileRefSize = exisitingFileRef.Size + exisitingFileOnCloud = exisitingFileRef.OnCloud } origfile, _, err := r.FormFile("uploadFile") @@ -1010,16 +1010,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl defer thumbfile.Close() } - fileInputData := &filestore.FileInputData{ - Name: formData.Filename, - Path: formData.Path, - OnCloud: exisitingFileOnCloud, - - IsResumable: formData.IsResumable, - UploadOffset: formData.UploadOffset, - UploadLength: formData.UploadLength, - IsFinal: formData.IsFinal, - } + fileInputData := &filestore.FileInputData{Name: formData.Filename, Path: formData.Path, OnCloud: exisitingFileOnCloud} fileOutputData, err := filestore.GetFileStore().WriteFile(allocationID, fileInputData, origfile, connectionObj.ConnectionID) if err != nil { return nil, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) @@ -1060,13 +1051,13 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*Upl formData.ThumbnailFilename = thumbInputData.Name } - if allocationObj.BlobberSizeUsed+(allocationSize-exisitingFileRefSize) > allocationObj.BlobberSize { + if allocationObj.BlobberSizeUsed+(allocationSize-existingFileRefSize) > allocationObj.BlobberSize { return nil, common.NewError("max_allocation_size", "Max size reached for the allocation with this blobber") } allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ConnectionID - allocationChange.Size = allocationSize - exisitingFileRefSize + allocationChange.Size = allocationSize - existingFileRefSize allocationChange.Operation = mode connectionObj.Size += allocationChange.Size