Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func setupConfig() {
viper.GetDuration("write_lock_timeout") / time.Second,
)

config.Configuration.WriteMarkerLockTimeout = viper.GetDuration("write_marker_lock_timeout")

config.Configuration.UpdateAllocationsInterval =
viper.GetDuration("update_allocations_interval")

Expand Down
53 changes: 53 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/dao.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package allocation

import (
"context"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/errors"
"github.com/0chain/gosdk/constants"
"gorm.io/gorm"
)

// GetOrCreate, get allocation if it exists in db. if not, try to sync it from blockchain, and insert it in db.
func GetOrCreate(ctx context.Context, store datastore.Store, allocationTx string) (*Allocation, error) {
db := store.GetDB()

if len(allocationTx) == 0 {
return nil, errors.Throw(constants.ErrInvalidParameter, "tx")
}

alloc := &Allocation{}
result := db.Table(TableNameAllocation).Where(SQLWhereGetByTx, allocationTx).First(alloc)

if result.Error == nil {
return alloc, nil
}

if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, errors.ThrowLog(result.Error.Error(), common.ErrBadDataStore)
}

return SyncAllocation(allocationTx)

}

const (
SQLWhereGetByTx = "allocations.tx = ?"
)

// DryRun Creates a prepared statement when executing any SQL and caches them to speed up future calls
// https://gorm.io/docs/performance.html#Caches-Prepared-Statement
func DryRun(db *gorm.DB) {

// https://gorm.io/docs/session.html#DryRun
// Session mode
tx := db.Session(&gorm.Session{PrepareStmt: true, DryRun: true})

// use Table instead of Model to reduce reflect times

// prepare statement for GetOrCreate
tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, "tx").First(&Allocation{})

}
12 changes: 10 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const (
CHUNK_SIZE = 64 * KB
)

const (
TableNameAllocation = "allocations"
)

type Allocation struct {
ID string `gorm:"column:id;primary_key"`
Tx string `gorm:"column:tx"`
Expand All @@ -44,7 +48,7 @@ type Allocation struct {
}

func (Allocation) TableName() string {
return "allocations"
return TableNameAllocation
}

// RestDurationInTimeUnits returns number (float point) of time units until
Expand Down Expand Up @@ -196,6 +200,10 @@ func (p *Pending) Save(tx *gorm.DB) error {
return tx.Save(p).Error
}

const (
TableNameTerms = "terms"
)

// Terms for allocation by its Tx.
type Terms struct {
ID int64 `gorm:"column:id;primary_key"`
Expand All @@ -207,7 +215,7 @@ type Terms struct {
}

func (*Terms) TableName() string {
return "terms"
return TableNameTerms
}

type ReadPool struct {
Expand Down
87 changes: 87 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/zcn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package allocation

import (
"encoding/json"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"github.com/0chain/errors"
"gorm.io/gorm"
)

// SyncAllocation try to pull allocation from blockchain, and insert it in db.
func SyncAllocation(allocationTx string) (*Allocation, error) {
t, err := transaction.VerifyTransaction(allocationTx, chain.GetServerChain())
if err != nil {
return nil, errors.Throw(common.ErrBadRequest,
"Invalid Allocation id. Allocation not found in blockchain.")
}
var sa transaction.StorageAllocation
err = json.Unmarshal([]byte(t.TransactionOutput), &sa)
if err != nil {
return nil, errors.ThrowLog(err.Error(), common.ErrInternal, "Error decoding the allocation transaction output.")
}

alloc := &Allocation{}

belongToThisBlobber := false
for _, blobberConnection := range sa.Blobbers {
if blobberConnection.ID == node.Self.ID {
belongToThisBlobber = true

alloc.AllocationRoot = ""
alloc.BlobberSize = (sa.Size + int64(len(sa.Blobbers)-1)) /
int64(len(sa.Blobbers))
alloc.BlobberSizeUsed = 0

break
}
}
if !belongToThisBlobber {
return nil, errors.Throw(common.ErrBadRequest,
"Blobber is not part of the open connection transaction")
}

// set/update fields
alloc.ID = sa.ID
alloc.Tx = sa.Tx
alloc.Expiration = sa.Expiration
alloc.OwnerID = sa.OwnerID
alloc.OwnerPublicKey = sa.OwnerPublicKey
alloc.RepairerID = t.ClientID // blobber node id
alloc.TotalSize = sa.Size
alloc.UsedSize = sa.UsedSize
alloc.Finalized = sa.Finalized
alloc.TimeUnit = sa.TimeUnit
alloc.IsImmutable = sa.IsImmutable

// related terms
terms := make([]*Terms, 0, len(sa.BlobberDetails))
for _, d := range sa.BlobberDetails {
terms = append(terms, &Terms{
BlobberID: d.BlobberID,
AllocationID: alloc.ID,
ReadPrice: d.Terms.ReadPrice,
WritePrice: d.Terms.WritePrice,
})
}

err = datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error {
if err := tx.Table(TableNameAllocation).Create(alloc).Error; err != nil {
return err
}

for _, term := range terms {
if err := tx.Table(TableNameTerms).Create(term).Error; err != nil {
return err
}
}

return nil
})

return alloc, err
}
3 changes: 3 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func SetupDefaultConfig() {
viper.SetDefault("challenge_completion_time", time.Duration(-1))
viper.SetDefault("read_lock_timeout", time.Duration(-1))
viper.SetDefault("write_lock_timeout", time.Duration(-1))
viper.SetDefault("write_marker_lock_timeout", time.Second*30)

viper.SetDefault("delegate_wallet", "")
viper.SetDefault("min_stake", 1.0)
Expand Down Expand Up @@ -115,6 +116,8 @@ type Config struct {

ReadLockTimeout int64 // seconds
WriteLockTimeout int64 // seconds
// WriteMarkerLockTimeout lock is released automatically if it is timeout
WriteMarkerLockTimeout time.Duration

UpdateAllocationsInterval time.Duration

Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/datastore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (store *postgresStore) GetDB() *gorm.DB {

func (store *postgresStore) AutoMigrate() error {

err := store.db.AutoMigrate(&Migration{})
err := store.db.AutoMigrate(&Migration{}, &WriteLock{})
if err != nil {
logging.Logger.Error("[db]", zap.Error(err))
}
Expand Down
19 changes: 19 additions & 0 deletions code/go/0chain.net/blobbercore/datastore/postgres_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datastore

import "time"

const (
TableNameWriteLock = "write_locks"
)

// WriteLock WriteMarker lock
type WriteLock struct {
AllocationID string `gorm:"primaryKey, column:allocation_id"`
SessionID string `gorm:"column:session_id"`
CreatedAt time.Time `gorm:"column:created_at"`
}

// TableName get table name of migrate
func (WriteLock) TableName() string {
return TableNameWriteLock
}
Loading