Skip to content

Commit

Permalink
Dep/update 2 (#1171)
Browse files Browse the repository at this point in the history
* Add path in thumbnail hash (#1098)

* get path from changes

* pass rootRef in apply changes

* add logs

* fix deletechange

* fix multi op and copy

* fix delete root dir

* fix delete root change

* fix lint issue

* cleanup and stats change

* rename func to processMove

* calculate object tree in ref path

* rmv return from deleteChange

* revert changes

* move to filestore changes

* check prevRoot

* add logs and condition

* check filestore

* requested changes

* validate nil proof

* add move and copy dir tests

* add logs

* use path to add child

* fix error log

* fix add child

* add proof log

* add more logs

* change read log

* log objectPath

* add commit logs

* commit thumbnail first

* add missing param

* add missing param

* add path in the thumbnail hash

* add allocID and validation root index

* use write

* add condn in count query

* rmv thumbnail_filename

* log wm

* cleanup

* add rollback wm check

* allow empty allocation root

* fix tests

* check path in upload

* fix check

* empty commit

---------

Co-authored-by: Kishan Dhakan <42718091+Kishan-Dhakan@users.noreply.github.com>
Co-authored-by: Yury <yuderbasov@gmail.com>

* Dep/update (#1153)

* dependencies update

* dependencies update

* Remove fileID from fileMetaHash (#1114)

* rmv fileID from hash calc

* Trigger Build

* adding false commit to restart systemtests

* add path in fileMetaHash

* empty commit

---------

Co-authored-by: Yury <yuderbasov@gmail.com>
Co-authored-by: shahnawaz-creator <117025384+shahnawaz-creator@users.noreply.github.com>
Co-authored-by: boddumanohar <b.manu199@gmail.com>
Co-authored-by: Kishan Dhakan <42718091+Kishan-Dhakan@users.noreply.github.com>

* optimize image (#1148)

Co-authored-by: boddumanohar <b.manu199@gmail.com>

* once for logger init

* Update challenge timing submission (#1140)

* update challenge timing submission

* fix createdAt in challenge timing table

---------

Co-authored-by: Yury <yuderbasov@gmail.com>

* Fix blobber size (#1163)

* Do not redeem readmarkers for free reads (#1166)

* do not redeem readmarkers for free reads

* fix unit tests, remove readmarker handling from download method

* updated dependencies

---------

Co-authored-by: Hitenjain14 <57557631+Hitenjain14@users.noreply.github.com>
Co-authored-by: Kishan Dhakan <42718091+Kishan-Dhakan@users.noreply.github.com>
Co-authored-by: shahnawaz-creator <117025384+shahnawaz-creator@users.noreply.github.com>
Co-authored-by: boddumanohar <b.manu199@gmail.com>
Co-authored-by: Manali-Jain-squareops <86873900+Manali-Jain-squareops@users.noreply.github.com>
Co-authored-by: Jayash Satolia <73050737+Jayashsatolia403@users.noreply.github.com>
Co-authored-by: stewartie4 <ryanstewart456@gmail.com>
Co-authored-by: Dinmukhammed <52813950+din-mukhammed@users.noreply.github.com>
  • Loading branch information
9 people committed Jul 11, 2023
1 parent c914eb7 commit 80cf2b7
Show file tree
Hide file tree
Showing 29 changed files with 442 additions and 623 deletions.
20 changes: 10 additions & 10 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Expand Up @@ -266,9 +266,11 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
for _, ref := range refs {

var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=? AND type=?", a.AllocationID, ref.PrevValidationRoot, reference.FILE).
Count(&count)
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}

limitCh <- struct{}{}
wg.Add(1)
Expand All @@ -279,13 +281,11 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
wg.Done()
}()

if count == 0 {
if ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
Expand Down
12 changes: 12 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Expand Up @@ -139,6 +139,18 @@ func (a *Allocation) GetRequiredWriteBalance(blobberID string, writeSize int64,
return
}

// IsReadFree Determine if read price is 0
func (a *Allocation) IsReadFree(blobberID string) bool {
for _, d := range a.Terms {
if d.BlobberID == blobberID {
if d.ReadPrice == 0 {
return true
}
}
}
return false
}

type Pending struct {
// ID of format client_id:allocation_id
ID string `gorm:"column:id;primaryKey"`
Expand Down
21 changes: 10 additions & 11 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Expand Up @@ -74,16 +74,6 @@ func (fc *BaseFileChanger) DeleteTempFile() error {

func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {

fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.Filename
fileInputData.Path = fc.Path
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
if fc.ThumbnailSize > 0 {
fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.ThumbnailFilename
Expand All @@ -96,6 +86,15 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
return common.NewError("file_store_error", "Error committing thumbnail to file store. "+err.Error())
}
}

fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.Filename
fileInputData.Path = fc.Path
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
return nil
}
Expand Up @@ -97,7 +97,6 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
fileRef.EncryptedKey = nf.EncryptedKey
fileRef.ChunkSize = nf.ChunkSize
fileRef.IsPrecommit = true
fileRef.ThumbnailFilename = nf.ThumbnailFilename

return rootRef, nil
}
Expand Down
Expand Up @@ -103,7 +103,6 @@ func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference
UpdatedAt: ts,
HashToBeComputed: true,
IsPrecommit: true,
ThumbnailFilename: nf.ThumbnailFilename,
}

fileID, ok := fileIDMeta[newFile.Path]
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
Expand Down Expand Up @@ -103,8 +104,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
}
foundBlobber = true
a.AllocationRoot = ""
a.BlobberSize = (sa.Size + int64(len(sa.BlobberDetails)-1)) /
int64(len(sa.BlobberDetails))
a.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))
a.BlobberSizeUsed = 0
break
}
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/zcn.go
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"github.com/0chain/errors"
"gorm.io/gorm"
"math"
)

// SyncAllocation try to pull allocation from blockchain, and insert it in db.
Expand All @@ -24,8 +25,7 @@ func SyncAllocation(allocationId string) (*Allocation, error) {
belongToThisBlobber = true

alloc.AllocationRoot = ""
alloc.BlobberSize = (sa.Size + int64(len(sa.BlobberDetails)-1)) /
int64(len(sa.BlobberDetails))
alloc.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))
alloc.BlobberSizeUsed = 0

break
Expand Down
43 changes: 26 additions & 17 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/util"
sdkUtil "github.com/0chain/gosdk/core/util"
"github.com/remeh/sizedwaitgroup"
"gorm.io/gorm"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -96,35 +97,40 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

rootRef, err := reference.GetReference(ctx, cr.AllocationID, "/")
if err != nil {
if err != nil && err != gorm.ErrRecordNotFound {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}

blockNum := int64(0)
if rootRef.NumBlocks > 0 {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum++
cr.BlockNum = blockNum
}
var objectPath *reference.ObjectPath
if rootRef != nil {
if rootRef.NumBlocks > 0 {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum++
cr.BlockNum = blockNum
}

logging.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}
logging.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err = reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}

cr.RefID = objectPath.RefID
cr.RefID = objectPath.RefID
cr.ObjectPath = objectPath
}
cr.RespondedAllocationRoot = allocationObj.AllocationRoot
cr.ObjectPath = objectPath

postData := make(map[string]interface{})
postData["challenge_id"] = cr.ChallengeID
postData["object_path"] = objectPath
if objectPath != nil {
postData["object_path"] = objectPath
}
markersArray := make([]map[string]interface{}, 0)
for _, wm := range wms {
markersMap := make(map[string]interface{})
Expand Down Expand Up @@ -188,6 +194,9 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
postData["challenge_proof"] = challengeResponse
}

if objectPath == nil {
objectPath = &reference.ObjectPath{}
}
err = UpdateChallengeTimingProofGenerationAndFileSize(
cr.ChallengeID,
proofGenTime,
Expand Down
36 changes: 15 additions & 21 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Expand Up @@ -26,6 +26,7 @@ package filestore
//

import (
"bytes"
"context"
"encoding/hex"
"errors"
Expand Down Expand Up @@ -122,7 +123,6 @@ func (fs *FileStore) MoveToFilestore(allocID, hash string) error {
}

_ = os.Rename(preCommitPath, fPath)

return nil
}

Expand All @@ -132,7 +132,7 @@ func (fs *FileStore) DeleteFromFilestore(allocID, hash string) error {
if err != nil {
return common.NewError("get_file_path_error", err.Error())
}

logging.Logger.Info("Deleting file from filestore", zap.String("path", fPath))
err = os.Remove(fPath)
if err != nil {
return common.NewError("blob_object_dir_creation_error", err.Error())
Expand All @@ -159,7 +159,8 @@ func (fs *FileStore) DeletePreCommitDir(allocID string) error {

func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData) (bool, error) {

logging.Logger.Info("Committing file")
logging.Logger.Info("Committing write", zap.String("allocation_id", allocID), zap.Any("file_data", fileData))

tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID)

fileHash := fileData.ValidationRoot
Expand All @@ -181,25 +182,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)

r, err := os.Open(tempFilePath)
if err != nil {

if errors.Is(err, os.ErrNotExist) {
f.Close()
_ = os.Remove(preCommitPath)
return true, nil
}
return false, err
} else {
// check if file is empty
check_file, err := os.Stat(tempFilePath)
if err == nil && check_file.Size() == 0 {
f.Close()
_ = os.Remove(preCommitPath)
return true, nil
} else if err != nil {
f.Close()
_ = os.Remove(preCommitPath)
return false, err
}
}

defer f.Close()
Expand All @@ -220,6 +203,10 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
if err != nil {
return false, common.NewError("read_error", err.Error())
}
_, err = h.Write([]byte(fileData.Path))
if err != nil {
return false, common.NewError("read_error", err.Error())
}
hash := hex.EncodeToString(h.Sum(nil))
if hash != fileData.ThumbnailHash {
return false, common.NewError("hash_mismatch",
Expand Down Expand Up @@ -803,6 +790,13 @@ func (fs *FileStore) updateAllocTempFileSize(allocID string, size int64) {
alloc.tmpFileSize += uint64(size)
}

func (fs *FileStore) pathWriter(path string) io.Reader {

pathBytes := []byte(path)
buf := bytes.NewBuffer(pathBytes)
return buf
}

// GetTempFilesSizeOfAllocation Get total file sizes of all allocation that are not yet committed
func (fs *FileStore) GetTotalTempFileSizes() (s uint64) {
for _, alloc := range fs.mAllocs {
Expand Down

0 comments on commit 80cf2b7

Please sign in to comment.