Skip to content

Commit

Permalink
Use WithNewTransaction (#1267)
Browse files Browse the repository at this point in the history
* use withNew transaction

* fix blobberstats
  • Loading branch information
Hitenjain14 committed Oct 1, 2023
1 parent c9df376 commit c506cf6
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 100 deletions.
35 changes: 4 additions & 31 deletions code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
Expand Down Expand Up @@ -166,12 +165,7 @@ func validateOnValidators(ctx context.Context, c *ChallengeEntity) error {
return nil
}

func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, error) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
defer ctx.Done()

tx := datastore.GetStore().GetTransaction(ctx)

func (c *ChallengeEntity) getCommitTransaction(ctx context.Context) (*transaction.Transaction, error) {
createdTime := common.ToTime(c.CreatedAt)
logging.Logger.Info("[challenge]commit",
zap.Any("challenge_id", c.ChallengeID),
Expand All @@ -180,24 +174,14 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro

if time.Since(common.ToTime(c.CreatedAt)) > config.StorageSCConfig.ChallengeCompletionTime {
c.CancelChallenge(ctx, ErrExpiredCCT)
if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", c.ChallengeID),
zap.Error(err))
}
return nil, ErrExpiredCCT
return nil, nil
}

txn, err := transaction.NewTransactionEntity()
if err != nil {
logging.Logger.Error("[challenge]createTxn", zap.Error(err))
c.CancelChallenge(ctx, err)
if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", c.ChallengeID),
zap.Error(err))
}
return nil, err
return nil, nil
}

sn := &ChallengeResponse{}
Expand All @@ -212,12 +196,7 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
if err != nil {
logging.Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error()))
c.CancelChallenge(ctx, err)
if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", c.ChallengeID),
zap.Error(err))
}
return nil, err
return nil, nil
}

err = UpdateChallengeTimingTxnSubmission(c.ChallengeID, txn.CreationDate)
Expand All @@ -229,11 +208,5 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
zap.Error(err))
}

if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", c.ChallengeID),
zap.Error(err))
}

return txn, nil
}
18 changes: 1 addition & 17 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
return nil
}

func (cr *ChallengeEntity) VerifyChallengeTransaction(txn *transaction.Transaction) error {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
defer ctx.Done()

tx := datastore.GetStore().GetTransaction(ctx)

func (cr *ChallengeEntity) VerifyChallengeTransaction(ctx context.Context, txn *transaction.Transaction) error {
if len(cr.LastCommitTxnIDs) > 0 {
for _, lastTxn := range cr.LastCommitTxnIDs {
logging.Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn)
Expand Down Expand Up @@ -373,11 +368,6 @@ func (cr *ChallengeEntity) VerifyChallengeTransaction(txn *transaction.Transacti
err = ErrEntityNotFound
}
_ = cr.Save(ctx)
if commitErr := tx.Commit().Error; commitErr != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", cr.ChallengeID),
zap.Error(commitErr))
}
return err
}
logging.Logger.Info("Success response from BC for challenge response transaction", zap.String("txn", txn.TransactionOutput), zap.String("challenge_id", cr.ChallengeID))
Expand All @@ -394,7 +384,6 @@ func IsEntityNotFoundError(err error) bool {
}

func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) {
tx := datastore.GetStore().GetTransaction(ctx)
cr.Status = Committed
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
Expand All @@ -417,9 +406,4 @@ func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transacti
zap.Time("txn_verified", txnVerification),
zap.Error(err))
}
if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]verify(Commit): ",
zap.Any("challenge_id", cr.ChallengeID),
zap.Error(err))
}
}
22 changes: 17 additions & 5 deletions code/go/0chain.net/blobbercore/challenge/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"github.com/emirpasic/gods/maps/treemap"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -141,7 +142,15 @@ func commitOnChainWorker(ctx context.Context) {

for _, challenge := range challenges {
chall := challenge
txn, _ := chall.getCommitTransaction()
var (
txn *transaction.Transaction
err error
)
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
txn, err = chall.getCommitTransaction(ctx)
return err
})

if txn != nil {
wg.Add(1)
go func(challenge *ChallengeEntity) {
Expand All @@ -151,10 +160,13 @@ func commitOnChainWorker(ctx context.Context) {
logging.Logger.Error("verifyChallengeTransaction", zap.Any("err", r))
}
}()
err := challenge.VerifyChallengeTransaction(txn)
if err == nil || err != ErrEntityNotFound {
deleteChallenge(int64(challenge.RoundCreatedAt))
}
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
err := challenge.VerifyChallengeTransaction(ctx, txn)
if err == nil || err != ErrEntityNotFound {
deleteChallenge(int64(challenge.RoundCreatedAt))
}
return nil
})
}(&chall)
}
}
Expand Down
65 changes: 32 additions & 33 deletions code/go/0chain.net/blobbercore/filestore/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,49 +51,48 @@ func (fs *FileStore) removeAllocation(ID string) {
}

func (fs *FileStore) initMap() error {
ctx, cnCl := context.WithCancel(context.Background())
defer cnCl()

ctx = datastore.GetStore().CreateTransaction(ctx)
db := datastore.GetStore().GetTransaction(ctx)
if db == nil {
return errors.New("could not get db client")
}
err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
db := datastore.GetStore().GetTransaction(ctx)
if db == nil {
return errors.New("could not get db client")
}

limitCh := make(chan struct{}, 50)
wg := &sync.WaitGroup{}
var dbAllocations []*dbAllocation
limitCh := make(chan struct{}, 50)
wg := &sync.WaitGroup{}
var dbAllocations []*dbAllocation

err := db.Model(&dbAllocation{}).FindInBatches(&dbAllocations, 1000, func(tx *gorm.DB, batch int) error {
allocsMap := make(map[string]*allocation)
err := db.Model(&dbAllocation{}).FindInBatches(&dbAllocations, 1000, func(tx *gorm.DB, batch int) error {
allocsMap := make(map[string]*allocation)

for _, dbAlloc := range dbAllocations {
a := allocation{
allocatedSize: uint64(dbAlloc.BlobberSize),
mu: &sync.Mutex{},
tmpMU: &sync.Mutex{},
}
for _, dbAlloc := range dbAllocations {
a := allocation{
allocatedSize: uint64(dbAlloc.BlobberSize),
mu: &sync.Mutex{},
tmpMU: &sync.Mutex{},
}

allocsMap[dbAlloc.ID] = &a
allocsMap[dbAlloc.ID] = &a

err := getStorageDetails(ctx, &a, dbAlloc.ID)
err := getStorageDetails(ctx, &a, dbAlloc.ID)

if err != nil {
return err
}
if err != nil {
return err
}

limitCh <- struct{}{}
wg.Add(1)
go fs.getTemporaryStorageDetails(ctx, &a, dbAlloc.ID, limitCh, wg)
limitCh <- struct{}{}
wg.Add(1)
go fs.getTemporaryStorageDetails(ctx, &a, dbAlloc.ID, limitCh, wg)

}
}

fs.setAllocations(allocsMap)
return nil
}).Error
fs.setAllocations(allocsMap)
return nil
}).Error

wg.Wait()
db.Commit()
wg.Wait()
db.Commit()
return err
})
return err
}

Expand Down
12 changes: 8 additions & 4 deletions code/go/0chain.net/blobbercore/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,16 @@ func writeResponse(w http.ResponseWriter, resp []byte) {
// todo wrap with connection
func StatsHandler(w http.ResponseWriter, r *http.Request) {
isJSON := r.Header.Get("Accept") == "application/json"

if isJSON {
var (
blobberStats any
err error
)
blobberInfo := GetBlobberInfoJson()

ctx := datastore.GetStore().CreateTransaction(r.Context())
blobberStats, err := stats.StatsJSONHandler(ctx, r)
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
blobberStats, err = stats.StatsJSONHandler(ctx, r)
return err
})

if err != nil {
Logger.Error("Error getting blobber JSON stats", zap.Error(err))
Expand Down
5 changes: 3 additions & 2 deletions code/go/0chain.net/blobbercore/handler/handler_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ func WithStatusConnection(handler common.StatusCodeResponderF) common.StatusCode
func WithStatusReadOnlyConnection(handler common.StatusCodeResponderF) common.StatusCodeResponderF {
return func(ctx context.Context, r *http.Request) (interface{}, int, error) {
ctx = GetMetaDataStore().CreateTransaction(ctx)
resp, statusCode, err := handler(ctx, r)
tx := GetMetaDataStore().GetTransaction(ctx)
defer func() {
GetMetaDataStore().GetTransaction(ctx).Rollback()
tx.Rollback()
}()
resp, statusCode, err := handler(ctx, r)
return resp, statusCode, err
}
}
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/stats/challengestats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package stats

import (
"context"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"gorm.io/datatypes"
)

Expand All @@ -28,8 +28,8 @@ type ChallengeEntity struct {
LastCommitTxnIDs []string `json:"last_commit_txn_ids" gorm:"-"`
ObjectPathString datatypes.JSON `json:"-" gorm:"column:object_path"`
ObjectPath *reference.ObjectPath `json:"object_path" gorm:"-"`
CreatedAt time.Time `gorm:"created_at"`
UpdatedAt time.Time `gorm:"updated_at"`
CreatedAt common.Timestamp `gorm:"created_at"`
UpdatedAt common.Timestamp `gorm:"updated_at"`
}

func (ChallengeEntity) TableName() string {
Expand Down
12 changes: 7 additions & 5 deletions code/go/0chain.net/blobbercore/stats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,13 @@ const tpl = `

func StatsHandler(w http.ResponseWriter, r *http.Request) {
t := template.Must(template.New("diagnostics").Funcs(funcMap).Parse(tpl))
ctx := datastore.GetStore().CreateTransaction(r.Context())
ctx = setStatsRequestDataInContext(ctx, r)
db := datastore.GetStore().GetTransaction(ctx)
defer db.Rollback()
bs := LoadBlobberStats(ctx)
ctx := setStatsRequestDataInContext(context.TODO(), r)
ctx = datastore.GetStore().CreateTransaction(ctx)
var bs *BlobberStats
_ = datastore.GetStore().WithTransaction(ctx, func(ctx context.Context) error {
bs = LoadBlobberStats(ctx)
return common.NewError("rollback", "read only")
})

err := t.Execute(w, bs)
if err != nil {
Expand Down

0 comments on commit c506cf6

Please sign in to comment.