diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index e24714b29..d835d7671 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -73,6 +73,7 @@ type AllocationChange struct { Connection AllocationChangeCollector `gorm:"foreignKey:ConnectionID"` // References allocation_connections(id) Input string `gorm:"column:input"` FilePath string `gorm:"-"` + LookupHash string `gorm:"column:lookup_hash;size:64"` datastore.ModelWithTS } @@ -97,6 +98,11 @@ func (change *AllocationChange) Save(ctx context.Context) error { return db.Save(change).Error } +func (change *AllocationChange) Create(ctx context.Context) error { + db := datastore.GetStore().GetTransaction(ctx) + return db.Create(change).Error +} + func ParseAffectedFilePath(input string) (string, error) { inputMap := make(map[string]interface{}) err := json.Unmarshal([]byte(input), &inputMap) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 2f00e50b6..06779a6e6 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -182,11 +182,7 @@ func SaveFileChange(ctx context.Context, connectionID, pathHash, fileName string change.lock.Lock() defer change.lock.Unlock() connectionObj.lock.Unlock() - dbConnectionObj, err := GetAllocationChanges(ctx, connectionID, connectionObj.AllocationID, connectionObj.ClientID) - if err != nil { - return saveChange, err - } - err = cmd.UpdateChange(ctx, dbConnectionObj) + err := cmd.AddChange(ctx) if err != nil { return saveChange, err } diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go index fb5515ab9..3bc327820 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go @@ -96,6 +96,9 @@ type FileCommand interface { // UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error + // AddChange add Allocation change to db + AddChange(ctx context.Context) error + //NumBlocks return number of blocks uploaded by the client GetNumBlocks() int64 } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_delete.go b/code/go/0chain.net/blobbercore/handler/file_command_delete.go index 5508fb878..b8b9ae0f2 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_delete.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_delete.go @@ -57,6 +57,7 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request } return common.NewError("bad_db_operation", err.Error()) } + cmd.existingFileRef.LookupHash = lookUpHash return nil } @@ -67,6 +68,12 @@ func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *DeleteFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.changeProcessor.Marshal() + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + // ProcessContent flush file to FileStorage func (cmd *DeleteFileCommand) ProcessContent(_ context.Context, allocationObj *allocation.Allocation) (allocation.UploadResult, error) { deleteSize := cmd.existingFileRef.Size @@ -86,6 +93,7 @@ func (cmd *DeleteFileCommand) ProcessContent(_ context.Context, allocationObj *a cmd.allocationChange.ConnectionID = connectionID cmd.allocationChange.Size = 0 - deleteSize cmd.allocationChange.Operation = constants.FileOperationDelete + cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index e798d7dee..ef2458141 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -231,6 +231,13 @@ func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *UpdateFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + func (cmd *UpdateFileCommand) GetNumBlocks() int64 { if cmd.fileChanger.IsFinal { return int64(math.Ceil(float64(cmd.fileChanger.Size*1.0) / float64(cmd.fileChanger.ChunkSize))) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index a98f39576..ded271b1e 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -123,7 +123,6 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request // ProcessContent flush file to FileStorage func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj *allocation.Allocation) (allocation.UploadResult, error) { - logging.Logger.Info("UploadFileCommand.ProcessContent", zap.Any("fileChanger", cmd.fileChanger.Path), zap.Any("uploadOffset", cmd.fileChanger.UploadOffset), zap.Any("isFinal", cmd.fileChanger.IsFinal)) result := allocation.UploadResult{} defer cmd.contentFile.Close() @@ -144,13 +143,11 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, allocationObj logging.Logger.Error("UploadFileCommand.ProcessContent", zap.Error(err)) return result, common.NewError("upload_error", "Failed to write file. "+err.Error()) } - result.Filename = cmd.fileChanger.Filename result.ValidationRoot = fileOutputData.ValidationRoot result.Size = fileOutputData.Size allocationSize := allocation.GetConnectionObjSize(connectionID) - cmd.fileChanger.AllocationID = allocationObj.ID cmd.allocationChange = &allocation.AllocationChange{} @@ -233,7 +230,7 @@ func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *a if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath { continue } - c.Size = connectionObj.Size + c.Size = cmd.fileChanger.Size c.Input, _ = cmd.fileChanger.Marshal() //c.ModelWithTS.UpdatedAt = time.Now() @@ -251,6 +248,13 @@ func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *a return connectionObj.Save(ctx) } +func (cmd *UploadFileCommand) AddChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = reference.GetReferenceLookup(cmd.fileChanger.AllocationID, cmd.fileChanger.Path) + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Create(ctx) +} + func (cmd *UploadFileCommand) GetNumBlocks() int64 { if cmd.fileChanger.IsFinal { return int64(math.Ceil(float64(cmd.fileChanger.Size*1.0) / float64(cmd.fileChanger.ChunkSize))) diff --git a/code/go/0chain.net/blobbercore/handler/handler_test.go b/code/go/0chain.net/blobbercore/handler/handler_test.go index 3bafa3a5b..1a393669d 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_test.go @@ -277,7 +277,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -657,7 +657,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -743,7 +743,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) @@ -847,17 +847,8 @@ func TestHandlers_Requiring_Signature(t *testing.T) { sqlmock.NewRows([]string{"found"}). AddRow(false), ) - mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "allocation_connections" WHERE`)). - WithArgs(connectionID, alloc.ID, alloc.OwnerID, allocation.DeletedConnection). - WillReturnRows( - sqlmock.NewRows([]string{}). - AddRow(), - ) - mock.ExpectExec(`INSERT INTO "allocation_connections"`). - WithArgs(aa, aa, aa, aa, aa, aa, aa). - WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectQuery(regexp.QuoteMeta(`INSERT INTO "allocation_changes"`)). - WithArgs(aa, aa, aa, aa, aa, aa). + WithArgs(aa, aa, aa, aa, aa, aa, aa). WillReturnRows( sqlmock.NewRows([]string{}), ) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index f5dbe5ccc..3144d1054 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -842,6 +842,7 @@ func (fsh *StorageHandler) RenameObject(ctx context.Context, r *http.Request) (i allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = 0 + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationRename dfc := &allocation.RenameFileChange{ConnectionID: connectionObj.ID, AllocationID: connectionObj.AllocationID, Path: objectRef.Path, Type: objectRef.Type} @@ -951,6 +952,7 @@ func (fsh *StorageHandler) CopyObject(ctx context.Context, r *http.Request) (int allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = objectRef.Size + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationCopy dfc := &allocation.CopyFileChange{ConnectionID: connectionObj.ID, AllocationID: connectionObj.AllocationID, DestPath: destPath} @@ -1062,6 +1064,7 @@ func (fsh *StorageHandler) MoveObject(ctx context.Context, r *http.Request) (int allocationChange := &allocation.AllocationChange{} allocationChange.ConnectionID = connectionObj.ID allocationChange.Size = 0 + allocationChange.LookupHash = pathHash allocationChange.Operation = constants.FileOperationMove dfc := &allocation.MoveFileChange{ ConnectionID: connectionObj.ID, @@ -1278,12 +1281,15 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all } elapsedVerifySig := time.Since(st) + st = time.Now() cmd := createFileCommand(r) err = cmd.IsValidated(ctx, r, allocationObj, clientID) if err != nil { return nil, err } + elapsedIsValidated := time.Since(st) + st = time.Now() // call process content, which writes to file checks if conn obj needs to be updated and if commit hasher needs to be called res, err := cmd.ProcessContent(ctx, allocationObj) if err != nil { @@ -1305,6 +1311,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all if blocks > 0 { go AddUploadedData(clientID, blocks) } + elapsedProcessContent := time.Since(st) Logger.Info("[upload]elapsed", zap.String("alloc_id", allocationID), zap.String("file", cmd.GetPath()), @@ -1312,7 +1319,8 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all zap.Duration("get_processor", elapsedGetConnectionProcessor), zap.Duration("get_alloc", elapsedAllocation), zap.Duration("sig", elapsedVerifySig), - zap.Duration("validate", time.Since(st)), + zap.Duration("validate", elapsedIsValidated), + zap.Duration("process_content", elapsedProcessContent), zap.Duration("total", time.Since(startTime))) return &res, nil } diff --git a/goose/migrations/1717416291_change_lookuphash.sql b/goose/migrations/1717416291_change_lookuphash.sql new file mode 100644 index 000000000..e2d8e71dd --- /dev/null +++ b/goose/migrations/1717416291_change_lookuphash.sql @@ -0,0 +1,6 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE allocation_changes ADD COLUMN lookup_hash character varying(64); + +-- CREATE UNIQUE INDEX idx_allocation_changes_lookup_hash ON allocation_changes USING HASH(lookup_hash,connection_id); +-- +goose StatementEnd \ No newline at end of file