From 1c62550e312ea32ec7fdc26322c00ff20004684d Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 2 Jun 2024 14:40:49 +0530 Subject: [PATCH 01/10] add timing logs for batch upload --- .../blobbercore/handler/file_command_upload.go | 10 +++++++--- .../blobbercore/handler/object_operation_handler.go | 7 ++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index a98f39576..40524e6b2 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -8,6 +8,7 @@ import ( "mime/multipart" "net/http" "path/filepath" + "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -123,7 +124,6 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request // ProcessContent flush file to FileStorage func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj *allocation.Allocation) (allocation.UploadResult, error) { - logging.Logger.Info("UploadFileCommand.ProcessContent", zap.Any("fileChanger", cmd.fileChanger.Path), zap.Any("uploadOffset", cmd.fileChanger.UploadOffset), zap.Any("isFinal", cmd.fileChanger.IsFinal)) result := allocation.UploadResult{} defer cmd.contentFile.Close() @@ -139,18 +139,21 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj FilePathHash: cmd.fileChanger.PathHash, Size: cmd.fileChanger.Size, } + now := time.Now() fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionID, fileInputData, cmd.contentFile) if err != nil { logging.Logger.Error("UploadFileCommand.ProcessContent", zap.Error(err)) return result, common.NewError("upload_error", "Failed to write file. "+err.Error()) } - + elapsedWriteFile := time.Since(now) + now = time.Now() result.Filename = cmd.fileChanger.Filename result.ValidationRoot = fileOutputData.ValidationRoot result.Size = fileOutputData.Size allocationSize := allocation.GetConnectionObjSize(connectionID) - + elapsedGetConnectionObjSize := time.Since(now) + now = time.Now() cmd.fileChanger.AllocationID = allocationObj.ID cmd.allocationChange = &allocation.AllocationChange{} @@ -193,6 +196,7 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj return result, common.NewError("max_allocation_size", "Max size reached for the allocation with this blobber") } + logging.Logger.Info("UploadFileCommand: ", zap.Duration("elapsedWriteFile", elapsedWriteFile), zap.Duration("elapsedGetConnectionObjSize", elapsedGetConnectionObjSize), zap.Duration("elapsedSaveChange", time.Since(now))) return result, nil } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index f5dbe5ccc..371d00b08 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -1278,12 +1278,15 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all } elapsedVerifySig := time.Since(st) + st = time.Now() cmd := createFileCommand(r) err = cmd.IsValidated(ctx, r, allocationObj, clientID) if err != nil { return nil, err } + elapsedIsValidated := time.Since(st) + st = time.Now() // call process content, which writes to file checks if conn obj needs to be updated and if commit hasher needs to be called res, err := cmd.ProcessContent(ctx, allocationObj) if err != nil { @@ -1305,6 +1308,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all if blocks > 0 { go AddUploadedData(clientID, blocks) } + elapsedProcessContent := time.Since(st) Logger.Info("[upload]elapsed", zap.String("alloc_id", allocationID), zap.String("file", cmd.GetPath()), @@ -1312,7 +1316,8 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all zap.Duration("get_processor", elapsedGetConnectionProcessor), zap.Duration("get_alloc", elapsedAllocation), zap.Duration("sig", elapsedVerifySig), - zap.Duration("validate", time.Since(st)), + zap.Duration("validate", elapsedIsValidated), + zap.Duration("process_content", elapsedProcessContent), zap.Duration("total", time.Since(startTime))) return &res, nil } From 2143a6ed92cb9fb612b1c9573595df437707cb85 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 2 Jun 2024 18:34:43 +0530 Subject: [PATCH 02/10] use another txn for save change --- .../allocation/allocationchange.go | 2 +- .../blobbercore/allocation/connection.go | 22 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index 52505a8f8..e24714b29 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -135,7 +135,7 @@ func GetAllocationChanges(ctx context.Context, connectionID, allocationID, clien allocationID, clientID, DeletedConnection, - ).Preload("Changes").First(cc).Error + ).Preload("Changes").Take(cc).Error if err == nil { cc.ComputeProperties() diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index febe4beab..6c9343c5d 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/seqpriorityqueue" @@ -165,7 +166,8 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) { connectionObj.UpdatedAt = time.Now() } -func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string, cmd FileCommand, isFinal bool, contentSize, offset, dataWritten, addSize int64) (bool, error) { +func SaveFileChange(_ context.Context, connectionID, pathHash, fileName string, cmd FileCommand, isFinal bool, contentSize, offset, dataWritten, addSize int64) (bool, error) { + now := time.Now() connectionObjMutex.RLock() connectionObj := connectionProcessor[connectionID] connectionObjMutex.RUnlock() @@ -176,23 +178,28 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string connectionObj.UpdatedAt = time.Now() change := connectionObj.changes[pathHash] saveChange := false + var elapsedChange time.Duration if change == nil { + changeTime := time.Now() change = &ConnectionChange{} connectionObj.changes[pathHash] = change - dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) - if err != nil { - return saveChange, err - } - err = cmd.UpdateChange(ctx, dbConnectionObj) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) + if err != nil { + return err + } + return cmd.UpdateChange(ctx, dbConnectionObj) + }) if err != nil { connectionObj.lock.Unlock() - return saveChange, err + return false, err } hasher := filestore.GetNewCommitHasher(contentSize) change.hasher = hasher change.seqPQ = seqpriorityqueue.NewSeqPriorityQueue(contentSize) go hasher.Start(connectionObj.ctx, connectionID, connectionObj.AllocationID, fileName, pathHash, change.seqPQ) saveChange = true + elapsedChange = time.Since(changeTime) } connectionObj.lock.Unlock() change.lock.Lock() @@ -216,6 +223,7 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string DataBytes: dataWritten, }) } + logging.Logger.Info("saveFileChange: ", zap.Duration("elapsedChange", elapsedChange), zap.Duration("total", time.Since(now))) return saveChange, nil } From 2e43cdab75c8a66353e211a846a7e939e1eecbbe Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 2 Jun 2024 19:35:58 +0530 Subject: [PATCH 03/10] add save change timing --- .../blobbercore/allocation/connection.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 6c9343c5d..4986147a8 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/seqpriorityqueue" @@ -166,7 +165,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) { connectionObj.UpdatedAt = time.Now() } -func SaveFileChange(_ context.Context, connectionID, pathHash, fileName string, cmd FileCommand, isFinal bool, contentSize, offset, dataWritten, addSize int64) (bool, error) { +func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string, cmd FileCommand, isFinal bool, contentSize, offset, dataWritten, addSize int64) (bool, error) { now := time.Now() connectionObjMutex.RLock() connectionObj := connectionProcessor[connectionID] @@ -183,16 +182,14 @@ func SaveFileChange(_ context.Context, connectionID, pathHash, fileName string, changeTime := time.Now() change = &ConnectionChange{} connectionObj.changes[pathHash] = change - err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { - dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) - if err != nil { - return err - } - return cmd.UpdateChange(ctx, dbConnectionObj) - }) + dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) + if err != nil { + return saveChange, err + } + err = cmd.UpdateChange(ctx, dbConnectionObj) if err != nil { connectionObj.lock.Unlock() - return false, err + return saveChange, err } hasher := filestore.GetNewCommitHasher(contentSize) change.hasher = hasher From a32a8c3ab4132dd6286ea2d95ebbeed63d1ac4d7 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sun, 2 Jun 2024 19:42:12 +0530 Subject: [PATCH 04/10] unlock faster --- .../0chain.net/blobbercore/allocation/connection.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 4986147a8..1516a589a 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -182,13 +182,15 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string changeTime := time.Now() change = &ConnectionChange{} connectionObj.changes[pathHash] = change + change.lock.Lock() + defer change.lock.Unlock() + connectionObj.lock.Unlock() dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) if err != nil { return saveChange, err } err = cmd.UpdateChange(ctx, dbConnectionObj) if err != nil { - connectionObj.lock.Unlock() return saveChange, err } hasher := filestore.GetNewCommitHasher(contentSize) @@ -197,10 +199,11 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string go hasher.Start(connectionObj.ctx, connectionID, connectionObj.AllocationID, fileName, pathHash, change.seqPQ) saveChange = true elapsedChange = time.Since(changeTime) + } else { + connectionObj.lock.Unlock() + change.lock.Lock() + defer change.lock.Unlock() } - connectionObj.lock.Unlock() - change.lock.Lock() - defer change.lock.Unlock() if change.isFinalized { return false, nil } From 5bbb279cba30e457f5e6b0f81c268b5b6c3ece40 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 00:37:22 +0530 Subject: [PATCH 05/10] fix lock ordering --- code/go/0chain.net/blobbercore/allocation/connection.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 8fa4bb03f..753f5f3b3 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -185,9 +185,6 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string change.lock.Lock() defer change.lock.Unlock() connectionObj.lock.Unlock() - change.lock.Lock() - defer change.lock.Unlock() - connectionObj.lock.Unlock() dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) if err != nil { return saveChange, err @@ -203,9 +200,9 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string saveChange = true elapsedChange = time.Since(changeTime) } else { - connectionObj.lock.Unlock() change.lock.Lock() defer change.lock.Unlock() + connectionObj.lock.Unlock() } if change.isFinalized { return false, nil From 5c12961e5a240f0345523cdaa9700e6b138cbcda Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 12:31:34 +0530 Subject: [PATCH 06/10] optimize add change --- .../0chain.net/blobbercore/allocation/connection.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 753f5f3b3..635830cf2 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -184,12 +184,15 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string connectionObj.changes[pathHash] = change change.lock.Lock() defer change.lock.Unlock() - connectionObj.lock.Unlock() - dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) - if err != nil { - return saveChange, err + allocationChangeCollector := &AllocationChangeCollector{ + AllocationID: connectionObj.AllocationID, + ID: connectionID, + ClientID: connectionObj.ClientID, + Size: connectionObj.Size, + Status: InProgressConnection, } - err = cmd.UpdateChange(ctx, dbConnectionObj) + connectionObj.lock.Unlock() + err := cmd.UpdateChange(ctx, allocationChangeCollector) if err != nil { return saveChange, err } From 9b75152eb850d07fd7e5a9c0472f1ece7b218ad6 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 16:08:22 +0530 Subject: [PATCH 07/10] create connection change --- .../blobbercore/allocation/allocationchange.go | 5 +++++ code/go/0chain.net/blobbercore/allocation/connection.go | 9 +-------- .../blobbercore/allocation/file_changer_base.go | 3 +++ .../blobbercore/handler/file_command_delete.go | 6 ++++++ .../blobbercore/handler/file_command_update.go | 6 ++++++ .../blobbercore/handler/file_command_upload.go | 8 +++++++- 6 files changed, 28 insertions(+), 9 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index e24714b29..ea6c60975 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -97,6 +97,11 @@ func (change *AllocationChange) Save(ctx context.Context) error { return db.Save(change).Error } +func (change *AllocationChange) Create(ctx context.Context) error { + db := datastore.GetStore().GetTransaction(ctx) + return db.Create(change).Error +} + func ParseAffectedFilePath(input string) (string, error) { inputMap := make(map[string]interface{}) err := json.Unmarshal([]byte(input), &inputMap) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 635830cf2..633011a05 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -184,15 +184,8 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string connectionObj.changes[pathHash] = change change.lock.Lock() defer change.lock.Unlock() - allocationChangeCollector := &AllocationChangeCollector{ - AllocationID: connectionObj.AllocationID, - ID: connectionID, - ClientID: connectionObj.ClientID, - Size: connectionObj.Size, - Status: InProgressConnection, - } connectionObj.lock.Unlock() - err := cmd.UpdateChange(ctx, allocationChangeCollector) + err := cmd.AddChange(ctx) if err != nil { return saveChange, err } diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go index fb5515ab9..3bc327820 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go @@ -96,6 +96,9 @@ type FileCommand interface { // UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error + // AddChange add Allocation change to db + AddChange(ctx context.Context) error + //NumBlocks return number of blocks uploaded by the client GetNumBlocks() int64 } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_delete.go b/code/go/0chain.net/blobbercore/handler/file_command_delete.go index 5508fb878..9287ebe45 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_delete.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_delete.go @@ -67,6 +67,12 @@ func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *DeleteFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.changeProcessor.Marshal() + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + // ProcessContent flush file to FileStorage func (cmd *DeleteFileCommand) ProcessContent(_ context.Context, allocationObj *allocation.Allocation) (allocation.UploadResult, error) { deleteSize := cmd.existingFileRef.Size diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index e798d7dee..cf7973d10 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -231,6 +231,12 @@ func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *UpdateFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + func (cmd *UpdateFileCommand) GetNumBlocks() int64 { if cmd.fileChanger.IsFinal { return int64(math.Ceil(float64(cmd.fileChanger.Size*1.0) / float64(cmd.fileChanger.ChunkSize))) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index 40524e6b2..0e5a09994 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -237,7 +237,7 @@ func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *a if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath { continue } - c.Size = connectionObj.Size + c.Size = cmd.fileChanger.Size c.Input, _ = cmd.fileChanger.Marshal() //c.ModelWithTS.UpdatedAt = time.Now() @@ -255,6 +255,12 @@ func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *UploadFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + func (cmd *UploadFileCommand) GetNumBlocks() int64 { if cmd.fileChanger.IsFinal { return int64(math.Ceil(float64(cmd.fileChanger.Size*1.0) / float64(cmd.fileChanger.ChunkSize))) From f85ff16fc705ff3ecd506f011e54f134ebbdfdff Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 17:57:31 +0530 Subject: [PATCH 08/10] add lookup hash in allocation changes --- .../0chain.net/blobbercore/allocation/allocationchange.go | 1 + .../0chain.net/blobbercore/handler/file_command_delete.go | 2 ++ .../0chain.net/blobbercore/handler/file_command_update.go | 1 + .../0chain.net/blobbercore/handler/file_command_upload.go | 1 + goose/migrations/1717416291_change_lookuphash.sql | 6 ++++++ 5 files changed, 11 insertions(+) create mode 100644 goose/migrations/1717416291_change_lookuphash.sql diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index ea6c60975..d835d7671 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -73,6 +73,7 @@ type AllocationChange struct { Connection AllocationChangeCollector `gorm:"foreignKey:ConnectionID"` // References allocation_connections(id) Input string `gorm:"column:input"` FilePath string `gorm:"-"` + LookupHash string `gorm:"column:lookup_hash;size:64"` datastore.ModelWithTS } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_delete.go b/code/go/0chain.net/blobbercore/handler/file_command_delete.go index 9287ebe45..b8b9ae0f2 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_delete.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_delete.go @@ -57,6 +57,7 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request } return common.NewError("bad_db_operation", err.Error()) } + cmd.existingFileRef.LookupHash = lookUpHash return nil } @@ -92,6 +93,7 @@ func (cmd *DeleteFileCommand) ProcessContent(_ context.Context, allocationObj *a cmd.allocationChange.ConnectionID = connectionID cmd.allocationChange.Size = 0 - deleteSize cmd.allocationChange.Operation = constants.FileOperationDelete + cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index cf7973d10..ef2458141 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -233,6 +233,7 @@ func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *a func (cmd *UpdateFileCommand) AddChange(ctx context.Context) error { connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash cmd.allocationChange.Input = connectionInput return cmd.allocationChange.Create(ctx) } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index 0e5a09994..caf96f33a 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -257,6 +257,7 @@ func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *a func (cmd *UploadFileCommand) AddChange(ctx context.Context) error { connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = reference.GetReferenceLookup(cmd.fileChanger.AllocationID, cmd.fileChanger.Path) cmd.allocationChange.Input = connectionInput return cmd.allocationChange.Create(ctx) } diff --git a/goose/migrations/1717416291_change_lookuphash.sql b/goose/migrations/1717416291_change_lookuphash.sql new file mode 100644 index 000000000..e2d8e71dd --- /dev/null +++ b/goose/migrations/1717416291_change_lookuphash.sql @@ -0,0 +1,6 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE allocation_changes ADD COLUMN lookup_hash character varying(64); + +-- CREATE UNIQUE INDEX idx_allocation_changes_lookup_hash ON allocation_changes USING HASH(lookup_hash,connection_id); +-- +goose StatementEnd \ No newline at end of file From 7742e9f308a8c1ada14a4390b82e093f56ec9c32 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 19:56:08 +0530 Subject: [PATCH 09/10] add lookup hash to connection change --- code/go/0chain.net/blobbercore/allocation/connection.go | 5 ----- .../0chain.net/blobbercore/handler/file_command_upload.go | 7 ------- 2 files changed, 12 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 633011a05..06779a6e6 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -166,7 +166,6 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) { } func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string, cmd FileCommand, isFinal bool, contentSize, offset, dataWritten, addSize int64) (bool, error) { - now := time.Now() connectionObjMutex.RLock() connectionObj := connectionProcessor[connectionID] connectionObjMutex.RUnlock() @@ -177,9 +176,7 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string connectionObj.UpdatedAt = time.Now() saveChange := false change := connectionObj.changes[pathHash] - var elapsedChange time.Duration if change == nil { - changeTime := time.Now() change = &ConnectionChange{} connectionObj.changes[pathHash] = change change.lock.Lock() @@ -194,7 +191,6 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string change.seqPQ = seqpriorityqueue.NewSeqPriorityQueue(contentSize) go hasher.Start(connectionObj.ctx, connectionID, connectionObj.AllocationID, fileName, pathHash, change.seqPQ) saveChange = true - elapsedChange = time.Since(changeTime) } else { change.lock.Lock() defer change.lock.Unlock() @@ -219,7 +215,6 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string DataBytes: dataWritten, }) } - logging.Logger.Info("saveFileChange: ", zap.Duration("elapsedChange", elapsedChange), zap.Duration("total", time.Since(now))) return saveChange, nil } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index caf96f33a..ded271b1e 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -8,7 +8,6 @@ import ( "mime/multipart" "net/http" "path/filepath" - "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -139,21 +138,16 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj FilePathHash: cmd.fileChanger.PathHash, Size: cmd.fileChanger.Size, } - now := time.Now() fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionID, fileInputData, cmd.contentFile) if err != nil { logging.Logger.Error("UploadFileCommand.ProcessContent", zap.Error(err)) return result, common.NewError("upload_error", "Failed to write file. "+err.Error()) } - elapsedWriteFile := time.Since(now) - now = time.Now() result.Filename = cmd.fileChanger.Filename result.ValidationRoot = fileOutputData.ValidationRoot result.Size = fileOutputData.Size allocationSize := allocation.GetConnectionObjSize(connectionID) - elapsedGetConnectionObjSize := time.Since(now) - now = time.Now() cmd.fileChanger.AllocationID = allocationObj.ID cmd.allocationChange = &allocation.AllocationChange{} @@ -196,7 +190,6 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj return result, common.NewError("max_allocation_size", "Max size reached for the allocation with this blobber") } - logging.Logger.Info("UploadFileCommand: ", zap.Duration("elapsedWriteFile", elapsedWriteFile), zap.Duration("elapsedGetConnectionObjSize", elapsedGetConnectionObjSize), zap.Duration("elapsedSaveChange", time.Since(now))) return result, nil } From f6d2d3a930fd306b29e3a93d0e3adc05914e2bbd Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Mon, 3 Jun 2024 20:08:57 +0530 Subject: [PATCH 10/10] fix unit test --- .../blobbercore/handler/handler_test.go | 17 ++++------------- .../handler/object_operation_handler.go | 3 +++ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/handler_test.go b/code/go/0chain.net/blobbercore/handler/handler_test.go index 3bafa3a5b..1a393669d 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_test.go @@ -277,7 +277,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -657,7 +657,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -743,7 +743,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -847,17 +847,8 @@ func TestHandlers_Requiring_Signature(t *testing.T) { sqlmock.NewRows([]string{"found"}). AddRow(false), ) - mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "allocation_connections" WHERE`)). - WithArgs(connectionID, alloc.ID, alloc.OwnerID, allocation.DeletedConnection). - WillReturnRows( - sqlmock.NewRows([]string{}). - AddRow(), - ) - mock.ExpectExec(`INSERT INTO "allocation_connections"`). - WithArgs(aa, aa, aa, aa, aa, aa, aa). - WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 371d00b08..3144d1054 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -842,6 +842,7 @@ func (fsh *StorageHandler) RenameObject(ctx context.Context, r *http.Request) (i allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = 0 + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationRename dfc := &allocation.RenameFileChange{ConnectionID: connectionObj.ID, AllocationID: connectionObj.AllocationID, Path: objectRef.Path, Type: objectRef.Type} @@ -951,6 +952,7 @@ func (fsh *StorageHandler) CopyObject(ctx context.Context, r *http.Request) (int allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = objectRef.Size + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationCopy dfc := &allocation.CopyFileChange{ConnectionID: connectionObj.ID, AllocationID: connectionObj.AllocationID, DestPath: destPath} @@ -1062,6 +1064,7 @@ func (fsh *StorageHandler) MoveObject(ctx context.Context, r *http.Request) (int allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = 0 + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationMove dfc := &allocation.MoveFileChange{ ConnectionID: connectionObj.ID,