Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix/challenge-worker' into fix/l…
Browse files Browse the repository at this point in the history
…oop-break

# Conflicts:
#	code/go/0chain.net/blobbercore/challenge/worker.go
#	go.mod
#	go.sum
  • Loading branch information
dabasov committed Nov 13, 2023
2 parents 838335e + c284724 commit 0ce821d
Show file tree
Hide file tree
Showing 27 changed files with 385 additions and 180 deletions.
11 changes: 9 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Expand Up @@ -239,23 +239,30 @@ func (a *AllocationChangeCollector) CommitToFileStore(ctx context.Context) error
// Can be configured at runtime, this number will depend on the number of active allocations
swg := sizedwaitgroup.New(5)
mut := &sync.Mutex{}
var (
commitError error
errorMutex sync.Mutex
)
for _, change := range a.AllocationChanges {
select {
case <-commitCtx.Done():
return fmt.Errorf("commit to filestore failed")
return fmt.Errorf("commit to filestore failed: %s", commitError.Error())
default:
}
swg.Add()
go func(change AllocationChangeProcessor) {
err := change.CommitToFileStore(ctx, mut)
if err != nil && !errors.Is(common.ErrFileWasDeleted, err) {
cancel()
errorMutex.Lock()
commitError = err
errorMutex.Unlock()
}
swg.Done()
}(change)
}
swg.Wait()
return nil
return commitError
}

func (a *AllocationChangeCollector) DeleteChanges(ctx context.Context) {
Expand Down
Expand Up @@ -21,7 +21,7 @@ type UploadFileChanger struct {
}

// ApplyChange update references, and create a new FileRef
func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

totalRefs, err := reference.CountRefs(ctx, nf.AllocationID)
Expand Down
@@ -0,0 +1,28 @@
//go:build integration_tests
// +build integration_tests

package allocation

import (
"context"
"errors"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
)

func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

state := conductrpc.Client().State()
if state.FailUploadCommit != nil {
for _, nodeId := range state.FailUploadCommit {
if nodeId == node.Self.ID {
return nil, errors.New("error directed by conductor")
}
}
}
return nf.applyChange(ctx, rootRef, change, allocationRoot, ts, fileIDMeta)
}
@@ -0,0 +1,15 @@
//go:build !integration_tests
// +build !integration_tests

package allocation

import (
"context"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {
return nf.applyChange(ctx, rootRef, change, allocationRoot, ts, fileIDMeta)
}
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/entity.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
Expand Down Expand Up @@ -106,6 +107,7 @@ type ChallengeEntity struct {
RoundCreatedAt int64 `gorm:"round_created_at" json:"round_created_at"`
CreatedAt common.Timestamp `gorm:"created_at" json:"created"`
UpdatedAt time.Time `gorm:"updated_at;type:timestamp without time zone;not null;default:current_timestamp" json:"-"`
statusMutex *sync.Mutex `gorm:"-" json:"-"`
}

func (ChallengeEntity) TableName() string {
Expand Down
10 changes: 9 additions & 1 deletion code/go/0chain.net/blobbercore/challenge/protocol.go
Expand Up @@ -49,7 +49,9 @@ func (cr *ChallengeEntity) CancelChallenge(ctx context.Context, errReason error)
cancellation := time.Now()
db := datastore.GetStore().GetTransaction(ctx)
deleteChallenge(cr.RoundCreatedAt)
cr.statusMutex.Lock()
cr.Status = Cancelled
cr.statusMutex.Unlock()
cr.StatusMessage = errReason.Error()
cr.UpdatedAt = cancellation.UTC()
if err := db.Save(cr).Error; err != nil {
Expand Down Expand Up @@ -225,7 +227,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
cr.ValidationTickets = make([]*ValidationTicket, len(cr.Validators))
}

accessMu := sync.Mutex{}
accessMu := sync.RWMutex{}
updateMapAndSlice := func(validatorID string, i int, vt *ValidationTicket) {
accessMu.Lock()
cr.ValidationTickets[i] = vt
Expand Down Expand Up @@ -306,10 +308,14 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}
}

accessMu.RLock()
logging.Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses), zap.Any("num_success", numSuccess), zap.Any("num_failed", numFailed))
accessMu.RUnlock()
if numSuccess > (len(cr.Validators) / 2) {
cr.Result = ChallengeSuccess
cr.statusMutex.Lock()
cr.Status = Processed
cr.statusMutex.Unlock()
cr.UpdatedAt = time.Now().UTC()
} else {
cr.CancelChallenge(ctx, ErrNoConsensusChallenge)
Expand Down Expand Up @@ -376,7 +382,9 @@ func IsEntityNotFoundError(err error) bool {
}

func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) {
cr.statusMutex.Lock()
cr.Status = Committed
cr.statusMutex.Unlock()
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.UpdatedAt = time.Now().UTC()
Expand Down
17 changes: 15 additions & 2 deletions code/go/0chain.net/blobbercore/challenge/worker.go
Expand Up @@ -210,9 +210,8 @@ func commitOnChainWorker(ctx context.Context) {

func getBatch(batchSize int) (chall []ChallengeEntity) {
challengeMapLock.RLock()
defer challengeMapLock.RUnlock()

if challengeMap.Size() == 0 {
challengeMapLock.RUnlock()
return
}

Expand All @@ -223,6 +222,7 @@ func getBatch(batchSize int) (chall []ChallengeEntity) {
break
}
ticket := it.Value().(*ChallengeEntity)
ticket.statusMutex.Lock()
switch ticket.Status {
case Committed:
case Cancelled:
Expand All @@ -235,6 +235,13 @@ func getBatch(batchSize int) (chall []ChallengeEntity) {
case Processed:
default:
}
if ticket.Status != Processed {
if checkExpiry(ticket.RoundCreatedAt) {
toDel = append(toDel, ticket.RoundCreatedAt)
}
continue
}
ticket.statusMutex.Unlock()
chall = append(chall, *ticket)
}
for _, r := range toClean {
Expand All @@ -260,6 +267,7 @@ func (it *ChallengeEntity) createChallenge(ctx context.Context) bool {
logging.Logger.Info("createChallenge", zap.String("challenge_id", it.ChallengeID), zap.String("status", "already exists"))
return false
}
it.statusMutex = &sync.Mutex{}
challengeMap.Put(it.RoundCreatedAt, it)
return true
}
Expand All @@ -269,3 +277,8 @@ func deleteChallenge(key int64) {
challengeMap.Remove(key)
challengeMapLock.Unlock()
}

func checkExpiry(roundCreatedAt int64) bool {
currentRound := roundInfo.CurrentRound + int64(float64(roundInfo.LastRoundDiff)*(float64(time.Since(roundInfo.CurrentRoundCaptureTime).Milliseconds())/float64(GetRoundInterval.Milliseconds())))
return currentRound-roundCreatedAt > config.StorageSCConfig.ChallengeCompletionTime
}
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Expand Up @@ -528,6 +528,7 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
dataSize: readBlockIn.FileSize,
}

logging.Logger.Debug("calling GetMerkleProofOfMultipleIndexes", zap.Any("readBlockIn", readBlockIn), zap.Any("vmp", vmp))
nodes, indexes, err := vp.GetMerkleProofOfMultipleIndexes(file, nodesSize, startBlock, endBlock)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
Expand Down
Expand Up @@ -288,7 +288,7 @@ type validationTreeProof struct {
// If endInd - startInd is whole file then no proof is required at all.
// startInd and endInd is taken as closed interval. So to get proof for data at index 0 both startInd
// and endInd would be 0.
func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
func (v *validationTreeProof) getMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {

if startInd < 0 || endInd < 0 {
Expand Down
@@ -0,0 +1,59 @@
//go:build integration_tests
// +build integration_tests

package filestore

import (
"io"

crpc "github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
)

func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {
nodes, indexes, err := v.getMerkleProofOfMultipleIndexes(r, nodesSize, startInd, endInd)
if err != nil {
return nodes, indexes, err
}

state := crpc.Client().State()
if state == nil {
return nodes, indexes, err
}

logging.Logger.Debug("[conductor] just after getMerkleProofOfMultipleIndexes",
zap.Bool("miss_up_download", state.MissUpDownload),
zap.Int("start_ind", startInd),
zap.Int("end_ind", endInd),
zap.Int64("nodes_size", nodesSize),
zap.Int("output_nodes_size", len(nodes)),
zap.Int("output_indexes_size", len(indexes)),
zap.Any("nodes", nodes),
zap.Any("indexes", indexes),
)

if state.MissUpDownload {
logging.Logger.Debug("miss up/download",
zap.Bool("miss_up_download", state.MissUpDownload),
zap.Int("start_ind", startInd),
zap.Int("end_ind", endInd),
zap.Int64("nodes_size", nodesSize),
zap.Int("output_nodes_size", len(nodes)),
zap.Int("output_indexes_size", len(indexes)),
zap.Any("nodes", nodes),
zap.Any("indexes", indexes),
)

for i := range nodes {
nodes[i][0] = []byte("wrong data")
}

for i := range indexes {
indexes[i][0] = 0
}
}

return nodes, indexes, err
}
11 changes: 11 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/tree_validation_main.go
@@ -0,0 +1,11 @@
//go:build !integration_tests
// +build !integration_tests

package filestore

import "io"

func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {
return v.getMerkleProofOfMultipleIndexes(r, nodesSize, startInd, endInd)
}
13 changes: 6 additions & 7 deletions code/go/0chain.net/blobbercore/handler/handler_download_test.go
Expand Up @@ -301,6 +301,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set(common.ClientSignatureHeader, sign)
r.Header.Set(common.ClientHeader, alloc.OwnerID)
r.Header.Set(common.ClientKeyHeader, alloc.OwnerPublicKey)
Expand Down Expand Up @@ -478,7 +479,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", pathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -559,7 +560,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", pathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -673,7 +674,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -793,7 +794,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -912,7 +913,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -1005,9 +1006,7 @@ func TestHandlers_Download(t *testing.T) {
m := make(map[string]interface{})
err = json.Unmarshal(data, &m)
require.NoError(t, err)

if test.wantCode != http.StatusOK || test.wantBody != "" {
fmt.Println("fprint", test.args.w.Body.String())
var body string
if m["Data"] != nil {
body = m["Data"].(string)
Expand Down
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/node"

"go.uber.org/zap"
Expand Down Expand Up @@ -379,6 +380,7 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
IsPrecommit: fromPreCommit,
}

logging.Logger.Info("calling GetFileBlock for thumb", zap.Any("rbi", rbi))
fileDownloadResponse, err = filestore.GetFileStore().GetFileBlock(rbi)
if err != nil {
return nil, common.NewErrorf("download_file", "couldn't get thumbnail block: %v", err)
Expand All @@ -398,6 +400,7 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
VerifyDownload: dr.VerifyDownload,
IsPrecommit: fromPreCommit,
}
logging.Logger.Info("calling GetFileBlock", zap.Any("rbi", rbi))
fileDownloadResponse, err = filestore.GetFileStore().GetFileBlock(rbi)
if err != nil {
return nil, common.NewErrorf("download_file", "couldn't get file block: %v", err)
Expand Down Expand Up @@ -433,6 +436,9 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
addDailyBlocks(clientID, dr.NumBlocks)
AddDownloadedData(clientID, dr.NumBlocks)
}()
if !dr.VerifyDownload {
return fileDownloadResponse.Data, nil
}
return fileDownloadResponse, nil
}

Expand Down
8 changes: 5 additions & 3 deletions code/go/0chain.net/blobbercore/stats/blobberstats.go
Expand Up @@ -105,10 +105,12 @@ func SetupStatsWorker(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(statsHandlerPeriod):
newFs := &BlobberStats{}
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
fs.loadBasicStats(ctx)
fs.loadDetailedStats(ctx)
fs.loadFailedChallengeList(ctx)
newFs.loadBasicStats(ctx)
newFs.loadDetailedStats(ctx)
newFs.loadFailedChallengeList(ctx)
fs = newFs
return common.NewError("rollback", "read_only")
})
}
Expand Down

0 comments on commit 0ce821d

Please sign in to comment.