From 7d76da93099ecbe43799af85799c7138b951a2fd Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 7 Mar 2022 15:31:39 +0800 Subject: [PATCH 1/3] fix(challenge): removed unused lock on challenge worker --- code/go/0chain.net/blobbercore/challenge/challenge.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/code/go/0chain.net/blobbercore/challenge/challenge.go b/code/go/0chain.net/blobbercore/challenge/challenge.go index 23f171d0f..4a00b6f4f 100644 --- a/code/go/0chain.net/blobbercore/challenge/challenge.go +++ b/code/go/0chain.net/blobbercore/challenge/challenge.go @@ -10,7 +10,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" - "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" "github.com/remeh/sizedwaitgroup" @@ -142,9 +141,6 @@ func processAccepted(ctx context.Context) { // loadValidationTickets load validation tickets for challenge func loadValidationTickets(ctx context.Context, challengeObj *ChallengeEntity) error { - mutex := lock.GetMutex(challengeObj.TableName(), challengeObj.ChallengeID) - mutex.Lock() - defer func() { if r := recover(); r != nil { logging.Logger.Error("[recover]LoadValidationTickets", zap.Any("err", r)) @@ -179,8 +175,7 @@ func commitProcessed(ctx context.Context) { if err := openchallenge.UnmarshalFields(); err != nil { logging.Logger.Error("ChallengeEntity_UnmarshalFields", zap.String("challenge_id", openchallenge.ChallengeID), zap.Error(err)) } - mutex := lock.GetMutex(openchallenge.TableName(), openchallenge.ChallengeID) - mutex.Lock() + redeemCtx := datastore.GetStore().CreateTransaction(ctx) err := openchallenge.CommitChallenge(redeemCtx, false) if err != nil { @@ -188,7 +183,7 @@ func commitProcessed(ctx context.Context) { zap.Error(err), zap.String("challenge_id", openchallenge.ChallengeID)) } - mutex.Unlock() + db := datastore.GetStore().GetTransaction(redeemCtx) db.Commit() if err == nil && openchallenge.Status == Committed { From 6124cb03248930652358fbf9fae7256aca3c0124 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 7 Mar 2022 15:36:51 +0800 Subject: [PATCH 2/3] fix(cleanup):improved lock code --- .../0chain.net/blobbercore/handler/worker.go | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/worker.go b/code/go/0chain.net/blobbercore/handler/worker.go index cb161e864..125514b98 100644 --- a/code/go/0chain.net/blobbercore/handler/worker.go +++ b/code/go/0chain.net/blobbercore/handler/worker.go @@ -31,27 +31,32 @@ func CleanupDiskFiles(ctx context.Context) error { var allocations []allocation.Allocation db.Find(&allocations) for _, allocationObj := range allocations { - mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) - mutex.Lock() - _ = filestore.GetFileStore().IterateObjects(allocationObj.ID, func(contentHash string, contentSize int64) { - var refs []reference.Ref - err := db.Table((reference.Ref{}).TableName()).Where(reference.Ref{ContentHash: contentHash, Type: reference.FILE}).Or(reference.Ref{ThumbnailHash: contentHash, Type: reference.FILE}).Find(&refs).Error - if err != nil { - Logger.Error("Error in cleanup of disk files.", zap.Error(err)) - return - } - if len(refs) == 0 { - Logger.Info("hash has no references. Deleting from disk", zap.Any("count", len(refs)), zap.String("hash", contentHash)) - if err := filestore.GetFileStore().DeleteFile(allocationObj.ID, contentHash); err != nil { - Logger.Error("FileStore_DeleteFile", zap.String("content_hash", contentHash), zap.Error(err)) - } - } - }) - mutex.Unlock() + cleanupAllocationFiles(&allocationObj) } return nil } +func cleanupAllocationFiles(allocationObj *allocation.Allocation) { + mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) + mutex.Lock() + defer mutex.Unlock() + _ = filestore.GetFileStore().IterateObjects(allocationObj.ID, func(contentHash string, contentSize int64) { + var refs []reference.Ref + err := db.Table((reference.Ref{}).TableName()).Where(reference.Ref{ContentHash: contentHash, Type: reference.FILE}).Or(reference.Ref{ThumbnailHash: contentHash, Type: reference.FILE}).Find(&refs).Error + if err != nil { + Logger.Error("Error in cleanup of disk files.", zap.Error(err)) + return + } + if len(refs) == 0 { + Logger.Info("hash has no references. Deleting from disk", zap.Any("count", len(refs)), zap.String("hash", contentHash)) + if err := filestore.GetFileStore().DeleteFile(allocationObj.ID, contentHash); err != nil { + Logger.Error("FileStore_DeleteFile", zap.String("content_hash", contentHash), zap.Error(err)) + } + } + }) + +} + func cleanupTempFiles(ctx context.Context) { defer func() { if r := recover(); r != nil { From d566ed9589ce7e3b3092eb27df0b612df98e156b Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 7 Mar 2022 15:38:42 +0800 Subject: [PATCH 3/3] fix(cleanup):improved lock code --- code/go/0chain.net/blobbercore/handler/worker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/worker.go b/code/go/0chain.net/blobbercore/handler/worker.go index 125514b98..1a4692722 100644 --- a/code/go/0chain.net/blobbercore/handler/worker.go +++ b/code/go/0chain.net/blobbercore/handler/worker.go @@ -10,6 +10,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/stats" "github.com/0chain/blobber/code/go/0chain.net/core/lock" + "gorm.io/gorm" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -31,12 +32,12 @@ func CleanupDiskFiles(ctx context.Context) error { var allocations []allocation.Allocation db.Find(&allocations) for _, allocationObj := range allocations { - cleanupAllocationFiles(&allocationObj) + cleanupAllocationFiles(db, &allocationObj) } return nil } -func cleanupAllocationFiles(allocationObj *allocation.Allocation) { +func cleanupAllocationFiles(db *gorm.DB, allocationObj *allocation.Allocation) { mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) mutex.Lock() defer mutex.Unlock()