Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status mutex #1323

Merged
merged 1 commit into from Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/entity.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
Expand Down Expand Up @@ -106,6 +107,7 @@ type ChallengeEntity struct {
RoundCreatedAt int64 `gorm:"round_created_at" json:"round_created_at"`
CreatedAt common.Timestamp `gorm:"created_at" json:"created"`
UpdatedAt time.Time `gorm:"updated_at;type:timestamp without time zone;not null;default:current_timestamp" json:"-"`
statusMutex *sync.Mutex `gorm:"-" json:"-"`
}

func (ChallengeEntity) TableName() string {
Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Expand Up @@ -49,7 +49,9 @@ func (cr *ChallengeEntity) CancelChallenge(ctx context.Context, errReason error)
cancellation := time.Now()
db := datastore.GetStore().GetTransaction(ctx)
deleteChallenge(cr.RoundCreatedAt)
cr.statusMutex.Lock()
cr.Status = Cancelled
cr.statusMutex.Unlock()
cr.StatusMessage = errReason.Error()
cr.UpdatedAt = cancellation.UTC()
if err := db.Save(cr).Error; err != nil {
Expand Down Expand Up @@ -311,7 +313,9 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
accessMu.RUnlock()
if numSuccess > (len(cr.Validators) / 2) {
cr.Result = ChallengeSuccess
cr.statusMutex.Lock()
cr.Status = Processed
cr.statusMutex.Unlock()
cr.UpdatedAt = time.Now().UTC()
} else {
cr.CancelChallenge(ctx, ErrNoConsensusChallenge)
Expand Down Expand Up @@ -378,7 +382,9 @@ func IsEntityNotFoundError(err error) bool {
}

func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) {
cr.statusMutex.Lock()
cr.Status = Committed
cr.statusMutex.Unlock()
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.UpdatedAt = time.Now().UTC()
Expand Down
25 changes: 21 additions & 4 deletions code/go/0chain.net/blobbercore/challenge/worker.go
Expand Up @@ -210,23 +210,34 @@ func commitOnChainWorker(ctx context.Context) {

func getBatch(batchSize int) (chall []ChallengeEntity) {
challengeMapLock.RLock()
defer challengeMapLock.RUnlock()

if challengeMap.Size() == 0 {
challengeMapLock.RUnlock()
return
}

toDel := make([]int64, 0)
it := challengeMap.Iterator()
for it.Next() {
if len(chall) >= batchSize {
break
}
ticket := it.Value().(*ChallengeEntity)
if ticket.Status != Processed {
ticket.statusMutex.Lock()
if ticket.Status == Accepted {
break
}
if ticket.Status != Processed {
if checkExpiry(ticket.RoundCreatedAt) {
toDel = append(toDel, ticket.RoundCreatedAt)
}
continue
}
ticket.statusMutex.Unlock()
chall = append(chall, *ticket)
}
challengeMapLock.RUnlock()
for _, key := range toDel {
deleteChallenge(key)
}
return
}

Expand All @@ -247,6 +258,7 @@ func (it *ChallengeEntity) createChallenge(ctx context.Context) bool {
logging.Logger.Info("createChallenge", zap.String("challenge_id", it.ChallengeID), zap.String("status", "already exists"))
return false
}
it.statusMutex = &sync.Mutex{}
challengeMap.Put(it.RoundCreatedAt, it)
return true
}
Expand All @@ -256,3 +268,8 @@ func deleteChallenge(key int64) {
challengeMap.Remove(key)
challengeMapLock.Unlock()
}

func checkExpiry(roundCreatedAt int64) bool {
currentRound := roundInfo.CurrentRound + int64(float64(roundInfo.LastRoundDiff)*(float64(time.Since(roundInfo.CurrentRoundCaptureTime).Milliseconds())/float64(GetRoundInterval.Milliseconds())))
return currentRound-roundCreatedAt > config.StorageSCConfig.ChallengeCompletionTime
}