diff --git a/code/go/0chain.net/blobbercore/challenge/challenge.go b/code/go/0chain.net/blobbercore/challenge/challenge.go index 23f171d0f..4a00b6f4f 100644 --- a/code/go/0chain.net/blobbercore/challenge/challenge.go +++ b/code/go/0chain.net/blobbercore/challenge/challenge.go @@ -10,7 +10,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" - "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" "github.com/remeh/sizedwaitgroup" @@ -142,9 +141,6 @@ func processAccepted(ctx context.Context) { // loadValidationTickets load validation tickets for challenge func loadValidationTickets(ctx context.Context, challengeObj *ChallengeEntity) error { - mutex := lock.GetMutex(challengeObj.TableName(), challengeObj.ChallengeID) - mutex.Lock() - defer func() { if r := recover(); r != nil { logging.Logger.Error("[recover]LoadValidationTickets", zap.Any("err", r)) @@ -179,8 +175,7 @@ func commitProcessed(ctx context.Context) { if err := openchallenge.UnmarshalFields(); err != nil { logging.Logger.Error("ChallengeEntity_UnmarshalFields", zap.String("challenge_id", openchallenge.ChallengeID), zap.Error(err)) } - mutex := lock.GetMutex(openchallenge.TableName(), openchallenge.ChallengeID) - mutex.Lock() + redeemCtx := datastore.GetStore().CreateTransaction(ctx) err := openchallenge.CommitChallenge(redeemCtx, false) if err != nil { @@ -188,7 +183,7 @@ func commitProcessed(ctx context.Context) { zap.Error(err), zap.String("challenge_id", openchallenge.ChallengeID)) } - mutex.Unlock() + db := datastore.GetStore().GetTransaction(redeemCtx) db.Commit() if err == nil && openchallenge.Status == Committed { diff --git a/code/go/0chain.net/blobbercore/handler/worker.go b/code/go/0chain.net/blobbercore/handler/worker.go index cb161e864..1a4692722 100644 --- a/code/go/0chain.net/blobbercore/handler/worker.go +++ b/code/go/0chain.net/blobbercore/handler/worker.go @@ -10,6 +10,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/stats" "github.com/0chain/blobber/code/go/0chain.net/core/lock" + "gorm.io/gorm" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -31,27 +32,32 @@ func CleanupDiskFiles(ctx context.Context) error { var allocations []allocation.Allocation db.Find(&allocations) for _, allocationObj := range allocations { - mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) - mutex.Lock() - _ = filestore.GetFileStore().IterateObjects(allocationObj.ID, func(contentHash string, contentSize int64) { - var refs []reference.Ref - err := db.Table((reference.Ref{}).TableName()).Where(reference.Ref{ContentHash: contentHash, Type: reference.FILE}).Or(reference.Ref{ThumbnailHash: contentHash, Type: reference.FILE}).Find(&refs).Error - if err != nil { - Logger.Error("Error in cleanup of disk files.", zap.Error(err)) - return - } - if len(refs) == 0 { - Logger.Info("hash has no references. Deleting from disk", zap.Any("count", len(refs)), zap.String("hash", contentHash)) - if err := filestore.GetFileStore().DeleteFile(allocationObj.ID, contentHash); err != nil { - Logger.Error("FileStore_DeleteFile", zap.String("content_hash", contentHash), zap.Error(err)) - } - } - }) - mutex.Unlock() + cleanupAllocationFiles(db, &allocationObj) } return nil } +func cleanupAllocationFiles(db *gorm.DB, allocationObj *allocation.Allocation) { + mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) + mutex.Lock() + defer mutex.Unlock() + _ = filestore.GetFileStore().IterateObjects(allocationObj.ID, func(contentHash string, contentSize int64) { + var refs []reference.Ref + err := db.Table((reference.Ref{}).TableName()).Where(reference.Ref{ContentHash: contentHash, Type: reference.FILE}).Or(reference.Ref{ThumbnailHash: contentHash, Type: reference.FILE}).Find(&refs).Error + if err != nil { + Logger.Error("Error in cleanup of disk files.", zap.Error(err)) + return + } + if len(refs) == 0 { + Logger.Info("hash has no references. Deleting from disk", zap.Any("count", len(refs)), zap.String("hash", contentHash)) + if err := filestore.GetFileStore().DeleteFile(allocationObj.ID, contentHash); err != nil { + Logger.Error("FileStore_DeleteFile", zap.String("content_hash", contentHash), zap.Error(err)) + } + } + }) + +} + func cleanupTempFiles(ctx context.Context) { defer func() { if r := recover(); r != nil {