Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions code/go/0chain.net/blobber/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func setupDatabase() error {
fmt.Print("\r[7/11] connect data store")
// check for database connection
for i := 0; i < 600; i++ {

if i > 0 {
fmt.Printf("\r[7/10] connect(%v) data store", i)
}
Expand All @@ -27,7 +26,6 @@ func setupDatabase() error {
}

time.Sleep(1 * time.Second)

}

return nil
Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobber/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
)

func init() {

flag.IntVar(&deploymentMode, "deployment_mode", 2, "deployment mode: 0=dev,1=test, 2=mainnet")
flag.StringVar(&keysFile, "keys_file", "", "keys_file")
flag.StringVar(&minioFile, "minio_file", "", "minio_file")
Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobber/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
)

func startGRPCServer() {

common.ConfigRateLimits()
r := mux.NewRouter()
initHandlers(r)
Expand Down
2 changes: 0 additions & 2 deletions code/go/0chain.net/blobber/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
)

func startHttpServer() {

mode := "main net"
if config.Development() {
mode = "development"
Expand All @@ -42,7 +41,6 @@ func startHttpServer() {
fmt.Println("[11/11] start http server [OK]")

wg.Wait()

}

func startServer(wg *sync.WaitGroup, r *mux.Router, mode string, port int, isTls bool) {
Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,4 @@ func main() {
startGRPCServer()

startHttpServer()

}
1 change: 0 additions & 1 deletion code/go/0chain.net/blobber/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func setupNode() error {

reader, err := os.Open(keysFile)
if err != nil {

return err
}
defer reader.Close()
Expand Down
3 changes: 0 additions & 3 deletions code/go/0chain.net/blobber/zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ func setupOnChain() {
fmt.Print(" [SKIP]\n")
break
} else {

if err := registerBlobberOnChain(); err != nil {
if i == 10 { // no more attempts
panic(err)
}
fmt.Print("\n ", err.Error()+"\n")

} else {
fmt.Print(" [OK]\n")
break
Expand All @@ -62,7 +60,6 @@ func setupOnChain() {
fmt.Printf("\r - wait %v seconds to retry", ATTEMPT_DELAY-n)
}
}

}
if !isIntegrationTest {
go setupWorkers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (change *AllocationChange) Save(ctx context.Context) error {
// GetAllocationChanges reload connection's changes in allocation from postgres.
// 1. update connection's status with NewConnection if connection_id is not found in postgres
// 2. mark as NewConnection if connection_id is marked as DeleteConnection
func GetAllocationChanges(ctx context.Context, connectionID string, allocationID string, clientID string) (*AllocationChangeCollector, error) {
func GetAllocationChanges(ctx context.Context, connectionID, allocationID, clientID string) (*AllocationChangeCollector, error) {
cc := &AllocationChangeCollector{}
db := datastore.GetStore().GetTransaction(ctx)
err := db.Where("connection_id = ? and allocation_id = ? and client_id = ? and status <> ?",
Expand Down Expand Up @@ -102,7 +102,6 @@ func (cc *AllocationChangeCollector) AddChange(allocationChange *AllocationChang
}

func (cc *AllocationChangeCollector) Save(ctx context.Context) error {

db := datastore.GetStore().GetTransaction(ctx)
if cc.Status == NewConnection {
cc.Status = InProgressConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ type AttributesChange struct {
}

// ProcessChange processes the attributes changes.
func (ac *AttributesChange) ProcessChange(ctx context.Context,
_ *AllocationChange, allocRoot string) (ref *reference.Ref, err error) {

func (ac *AttributesChange) ProcessChange(ctx context.Context, _ *AllocationChange, allocRoot string) (ref *reference.Ref, err error) {
var path, _ = filepath.Split(ac.Path)
path = filepath.Clean(path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (rf *CopyFileChange) ProcessChange(ctx context.Context, change *AllocationC
return rootRef, err
}

func (rf *CopyFileChange) processCopyRefs(ctx context.Context, affectedRef *reference.Ref, destRef *reference.Ref, allocationRoot string) {
func (rf *CopyFileChange) processCopyRefs(ctx context.Context, affectedRef, destRef *reference.Ref, allocationRoot string) {
if affectedRef.Type == reference.DIRECTORY {
newRef := reference.NewDirectoryRef()
newRef.AllocationID = rf.AllocationID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type DeleteFileChange struct {
}

func (nf *DeleteFileChange) ProcessChange(ctx context.Context, change *AllocationChange, allocationRoot string) (*reference.Ref, error) {

rootRef, contentHash, err := reference.DeleteObject(ctx, nf.AllocationID, nf.Path)
if err != nil {
return nil, err
Expand Down
38 changes: 9 additions & 29 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ func (Allocation) TableName() string {

// RestDurationInTimeUnits returns number (float point) of time units until
// allocation ends.
func (a *Allocation) RestDurationInTimeUnits(wmt common.Timestamp) (
rdtu float64) {

func (a *Allocation) RestDurationInTimeUnits(wmt common.Timestamp) (rdtu float64) {
var (
wmtt = time.Unix(int64(wmt), 0)
expt = time.Unix(int64(a.Expiration), 0)
Expand Down Expand Up @@ -92,9 +90,7 @@ type WantWriter interface {
// WantWrite returns amount of tokens (by current terms of the allocations that
// should be loaded) by given size for given blobber. E.g. want is tokens
// wanted.
func (a *Allocation) WantWrite(blobberID string, size int64,
wmt common.Timestamp) (value int64) {

func (a *Allocation) WantWrite(blobberID string, size int64, wmt common.Timestamp) (value int64) {
if size < 0 {
return // deleting, ignore
}
Expand All @@ -111,9 +107,7 @@ func (a *Allocation) WantWrite(blobberID string, size int64,
}

// ReadPools from DB cache.
func ReadPools(tx *gorm.DB, clientID, allocID, blobberID string,
until common.Timestamp) (rps []*ReadPool, err error) {

func ReadPools(tx *gorm.DB, clientID, allocID, blobberID string, until common.Timestamp) (rps []*ReadPool, err error) {
const query = `client_id = ? AND
allocation_id = ? AND
blobber_id = ? AND
Expand All @@ -127,9 +121,7 @@ func ReadPools(tx *gorm.DB, clientID, allocID, blobberID string,

// HaveRead is sum of read pools (the list should be filtered by query
// excluding pools expired and pools going to expired soon) minus pending reads.
func (a *Allocation) HaveRead(rps []*ReadPool, blobberID string,
pendNumBlocks int64) (have int64) {

func (a *Allocation) HaveRead(rps []*ReadPool, blobberID string, pendNumBlocks int64) (have int64) {
for _, rp := range rps {
have += rp.Balance
}
Expand All @@ -150,9 +142,7 @@ func (*Pending) TableName() string {
return "pendings"
}

func GetPending(tx *gorm.DB, clientID, allocationID, blobberID string) (
p *Pending, err error) {

func GetPending(tx *gorm.DB, clientID, allocationID, blobberID string) (p *Pending, err error) {
const query = `client_id = ? AND
allocation_id = ? AND
blobber_id = ?`
Expand Down Expand Up @@ -180,9 +170,7 @@ func (p *Pending) SubPendingWrite(size int64) {
}
}

func (p *Pending) WritePools(tx *gorm.DB, blobberID string,
until common.Timestamp) (wps []*WritePool, err error) {

func (p *Pending) WritePools(tx *gorm.DB, blobberID string, until common.Timestamp) (wps []*WritePool, err error) {
const query = `client_id = ? AND
allocation_id = ? AND
blobber_id = ? AND
Expand All @@ -194,9 +182,7 @@ func (p *Pending) WritePools(tx *gorm.DB, blobberID string,
return
}

func (p *Pending) HaveWrite(wps []*WritePool, ww WantWriter,
wmt common.Timestamp) (have int64) {

func (p *Pending) HaveWrite(wps []*WritePool, ww WantWriter, wmt common.Timestamp) (have int64) {
for _, wp := range wps {
have += wp.Balance
}
Expand Down Expand Up @@ -254,11 +240,8 @@ func (*WritePool) TableName() string {
return "write_pools"
}

func SetReadPools(db *gorm.DB, clientID, allocationID, blobberID string,
rps []*ReadPool) (err error) {

func SetReadPools(db *gorm.DB, clientID, allocationID, blobberID string, rps []*ReadPool) (err error) {
// cleanup and batch insert (remove old pools, add / update new)

const query = `client_id = ? AND
allocation_id = ? AND
blobber_id = ?`
Expand All @@ -282,9 +265,7 @@ func SetReadPools(db *gorm.DB, clientID, allocationID, blobberID string,
return
}

func SetWritePools(db *gorm.DB, clientID, allocationID, blobberID string,
wps []*WritePool) (err error) {

func SetWritePools(db *gorm.DB, clientID, allocationID, blobberID string, wps []*WritePool) (err error) {
const query = `client_id = ? AND
allocation_id = ? AND
blobber_id = ?`
Expand Down Expand Up @@ -319,7 +300,6 @@ type ReadPoolRedeem struct {

// SubReadRedeemed subtracts tokens redeemed from read pools.
func SubReadRedeemed(rps []*ReadPool, redeems []ReadPoolRedeem) {

var rm = make(map[string]int64)

for _, rd := range redeems {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ type AddFileChanger struct {
}

// ProcessChange update references, and create a new FileRef
func (nf *AddFileChanger) ProcessChange(ctx context.Context,
change *AllocationChange, allocationRoot string) (*reference.Ref, error) {

func (nf *AddFileChanger) ProcessChange(ctx context.Context, change *AllocationChange, allocationRoot string) (*reference.Ref, error) {
path, _ := filepath.Split(nf.Path)
path = filepath.Clean(path)
tSubDirs := reference.GetSubDirsFromPath(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type UpdateFileChanger struct {
}

func (nf *UpdateFileChanger) ProcessChange(ctx context.Context, change *AllocationChange, allocationRoot string) (*reference.Ref, error) {

path, _ := filepath.Split(nf.Path)
path = filepath.Clean(path)
tSubDirs := reference.GetSubDirsFromPath(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ func (nf *NewFileChange) CreateDir(ctx context.Context, allocationID, dirName, a
return rootRef, nil
}

func (nf *NewFileChange) ProcessChange(ctx context.Context,
change *AllocationChange, allocationRoot string) (*reference.Ref, error) {

func (nf *NewFileChange) ProcessChange(ctx context.Context, change *AllocationChange, allocationRoot string) (*reference.Ref, error) {
if change.Operation == constants.FileOperationCreateDir {
err := nf.Unmarshal(change.Input)
if err != nil {
Expand Down
31 changes: 12 additions & 19 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
)

// GetAllocationByID from DB. This function doesn't load related terms.
func GetAllocationByID(ctx context.Context, allocID string) (
a *Allocation, err error) {

func GetAllocationByID(ctx context.Context, allocID string) (a *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)

a = new(Allocation)
Expand Down Expand Up @@ -49,9 +47,7 @@ func (a *Allocation) LoadTerms(ctx context.Context) (err error) {
}

// VerifyAllocationTransaction try to get allocation from postgres.if it doesn't exists, get it from sharders, and insert it into postgres.
func VerifyAllocationTransaction(ctx context.Context, allocationTx string,
readonly bool) (a *Allocation, err error) {

func VerifyAllocationTransaction(ctx context.Context, allocationTx string, readonly bool) (a *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)

a = new(Allocation)
Expand Down Expand Up @@ -101,14 +97,15 @@ func VerifyAllocationTransaction(ctx context.Context, allocationTx string,
if !isExist {
foundBlobber := false
for _, blobberConnection := range sa.Blobbers {
if blobberConnection.ID == node.Self.ID {
foundBlobber = true
a.AllocationRoot = ""
a.BlobberSize = (sa.Size + int64(len(sa.Blobbers)-1)) /
int64(len(sa.Blobbers))
a.BlobberSizeUsed = 0
break
if blobberConnection.ID != node.Self.ID {
continue
}
foundBlobber = true
a.AllocationRoot = ""
a.BlobberSize = (sa.Size + int64(len(sa.Blobbers)-1)) /
int64(len(sa.Blobbers))
a.BlobberSizeUsed = 0
break
}
if !foundBlobber {
return nil, common.NewError("invalid_blobber",
Expand Down Expand Up @@ -178,9 +175,7 @@ type PoolStat struct {
ExpireAt common.Timestamp `json:"expire_at"`
}

func RequestReadPools(clientID, allocationID string) (
rps []*ReadPool, err error) {

func RequestReadPools(clientID, allocationID string) (rps []*ReadPool, err error) {
Logger.Info("request read pools")

var (
Expand Down Expand Up @@ -227,9 +222,7 @@ func RequestReadPools(clientID, allocationID string) (
return // got them
}

func RequestWritePools(clientID, allocationID string) (
wps []*WritePool, err error) {

func RequestWritePools(clientID, allocationID string) (wps []*WritePool, err error) {
Logger.Info("request write pools")

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (MockFileBlockGetter) GetFileBlock(
fileData *filestore.FileInputData,
blockNum int64,
numBlocks int64) ([]byte, error) {
return []byte(mockFileBlock), nil
return mockFileBlock, nil
}

func resetMockFileBlock() {
Expand Down
Loading