Skip to content

Commit

Permalink
Merge branch 'sprint-1.11' into feat/db-snapshot-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
dabasov committed Nov 10, 2023
2 parents e2dedd3 + 577752c commit 85646cd
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 12 deletions.
Expand Up @@ -21,7 +21,7 @@ type UploadFileChanger struct {
}

// ApplyChange update references, and create a new FileRef
func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

totalRefs, err := reference.CountRefs(ctx, nf.AllocationID)
Expand Down
@@ -0,0 +1,28 @@
//go:build integration_tests
// +build integration_tests

package allocation

import (
"context"
"errors"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
)

func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

state := conductrpc.Client().State()
if state.FailUploadCommit != nil {
for _, nodeId := range state.FailUploadCommit {
if nodeId == node.Self.ID {
return nil, errors.New("error directed by conductor")
}
}
}
return nf.applyChange(ctx, rootRef, change, allocationRoot, ts, fileIDMeta)
}
@@ -0,0 +1,15 @@
//go:build !integration_tests
// +build !integration_tests

package allocation

import (
"context"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {
return nf.applyChange(ctx, rootRef, change, allocationRoot, ts, fileIDMeta)
}
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Expand Up @@ -528,6 +528,7 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
dataSize: readBlockIn.FileSize,
}

logging.Logger.Debug("calling GetMerkleProofOfMultipleIndexes", zap.Any("readBlockIn", readBlockIn), zap.Any("vmp", vmp))
nodes, indexes, err := vp.GetMerkleProofOfMultipleIndexes(file, nodesSize, startBlock, endBlock)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
Expand Down
Expand Up @@ -288,7 +288,7 @@ type validationTreeProof struct {
// If endInd - startInd is whole file then no proof is required at all.
// startInd and endInd is taken as closed interval. So to get proof for data at index 0 both startInd
// and endInd would be 0.
func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
func (v *validationTreeProof) getMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {

if startInd < 0 || endInd < 0 {
Expand Down
@@ -0,0 +1,59 @@
//go:build integration_tests
// +build integration_tests

package filestore

import (
"io"

crpc "github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
)

func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {
nodes, indexes, err := v.getMerkleProofOfMultipleIndexes(r, nodesSize, startInd, endInd)
if err != nil {
return nodes, indexes, err
}

state := crpc.Client().State()
if state == nil {
return nodes, indexes, err
}

logging.Logger.Debug("[conductor] just after getMerkleProofOfMultipleIndexes",
zap.Bool("miss_up_download", state.MissUpDownload),
zap.Int("start_ind", startInd),
zap.Int("end_ind", endInd),
zap.Int64("nodes_size", nodesSize),
zap.Int("output_nodes_size", len(nodes)),
zap.Int("output_indexes_size", len(indexes)),
zap.Any("nodes", nodes),
zap.Any("indexes", indexes),
)

if state.MissUpDownload {
logging.Logger.Debug("miss up/download",
zap.Bool("miss_up_download", state.MissUpDownload),
zap.Int("start_ind", startInd),
zap.Int("end_ind", endInd),
zap.Int64("nodes_size", nodesSize),
zap.Int("output_nodes_size", len(nodes)),
zap.Int("output_indexes_size", len(indexes)),
zap.Any("nodes", nodes),
zap.Any("indexes", indexes),
)

for i := range nodes {
nodes[i][0] = []byte("wrong data")
}

for i := range indexes {
indexes[i][0] = 0
}
}

return nodes, indexes, err
}
11 changes: 11 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/tree_validation_main.go
@@ -0,0 +1,11 @@
//go:build !integration_tests
// +build !integration_tests

package filestore

import "io"

func (v *validationTreeProof) GetMerkleProofOfMultipleIndexes(r io.ReadSeeker, nodesSize int64, startInd, endInd int) (
[][][]byte, [][]int, error) {
return v.getMerkleProofOfMultipleIndexes(r, nodesSize, startInd, endInd)
}
13 changes: 6 additions & 7 deletions code/go/0chain.net/blobbercore/handler/handler_download_test.go
Expand Up @@ -301,6 +301,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set(common.ClientSignatureHeader, sign)
r.Header.Set(common.ClientHeader, alloc.OwnerID)
r.Header.Set(common.ClientKeyHeader, alloc.OwnerPublicKey)
Expand Down Expand Up @@ -478,7 +479,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", pathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -559,7 +560,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", pathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -673,7 +674,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -793,7 +794,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -912,7 +913,7 @@ func TestHandlers_Download(t *testing.T) {
r.Header.Set("X-Path-Hash", filePathHash)
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
r.Header.Set("X-Num-Blocks", fmt.Sprintf("%d", 1))
r.Header.Set("X-Verify-Download", fmt.Sprint(false))
r.Header.Set("X-Verify-Download", fmt.Sprint(true))
r.Header.Set("X-Connection-ID", connectionID)
r.Header.Set("X-Mode", DownloadContentFull)
r.Header.Set("X-Auth-Token", authTicket)
Expand Down Expand Up @@ -1005,9 +1006,7 @@ func TestHandlers_Download(t *testing.T) {
m := make(map[string]interface{})
err = json.Unmarshal(data, &m)
require.NoError(t, err)

if test.wantCode != http.StatusOK || test.wantBody != "" {
fmt.Println("fprint", test.args.w.Body.String())
var body string
if m["Data"] != nil {
body = m["Data"].(string)
Expand Down
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/node"

"go.uber.org/zap"
Expand Down Expand Up @@ -379,6 +380,7 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
IsPrecommit: fromPreCommit,
}

logging.Logger.Info("calling GetFileBlock for thumb", zap.Any("rbi", rbi))
fileDownloadResponse, err = filestore.GetFileStore().GetFileBlock(rbi)
if err != nil {
return nil, common.NewErrorf("download_file", "couldn't get thumbnail block: %v", err)
Expand All @@ -398,6 +400,7 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
VerifyDownload: dr.VerifyDownload,
IsPrecommit: fromPreCommit,
}
logging.Logger.Info("calling GetFileBlock", zap.Any("rbi", rbi))
fileDownloadResponse, err = filestore.GetFileStore().GetFileBlock(rbi)
if err != nil {
return nil, common.NewErrorf("download_file", "couldn't get file block: %v", err)
Expand Down Expand Up @@ -433,6 +436,9 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
addDailyBlocks(clientID, dr.NumBlocks)
AddDownloadedData(clientID, dr.NumBlocks)
}()
if !dr.VerifyDownload {
return fileDownloadResponse.Data, nil
}
return fileDownloadResponse, nil
}

Expand Down
5 changes: 5 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/client.go
Expand Up @@ -53,6 +53,11 @@ func (c *client) blobberCommitted(blobberID string) (err error) {
return
}

func (c *client) validationTicketGenerated(ticket ValidtorTicket) (err error) {
err = c.client.Call("Server.ValidatorTicket", &ticket, nil)
return
}

func (c *client) sendFileMetaRoot(m map[string]string) (err error) {
err = c.client.Call("Server.GetFileMetaRoot", m, nil)
return
Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/entity.go
Expand Up @@ -112,6 +112,12 @@ func (e *Entity) BlobberCommitted(blobberID string) {
}
}

func (e *Entity) ValidatorTicket(ticket ValidtorTicket) {
err := e.client.validationTicketGenerated(ticket)
if err != nil {
log.Println(err)
}
}

func (e *Entity) SendFileMetaRoot(blobberID, fileMetaRoot string, ctxCncl context.CancelFunc) {
m := map[string]string{
Expand Down
4 changes: 4 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/server.go
Expand Up @@ -16,3 +16,7 @@ type (
RoundName = config.RoundName
Number = config.Number
)

type ValidtorTicket struct {
ValidatorId string
}
3 changes: 3 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/state.go
Expand Up @@ -62,9 +62,12 @@ type State struct {
BlobberUpload BlobberUpload
BlobberDelete BlobberDelete
AdversarialValidator AdversarialValidator
NotifyOnValidationTicketGeneration bool
StopWMCommit *bool
FailRenameCommit []string
FailUploadCommit []string
GetFileMetaRoot bool
MissUpDownload bool
}

// Name returns NodeName by given NodeID.
Expand Down
7 changes: 6 additions & 1 deletion code/go/0chain.net/core/common/handler.go
Expand Up @@ -82,7 +82,12 @@ func ToByteStream(handler JSONResponderF) ReqRespHandlerf {
w.Write(rawdata) //nolint:errcheck
} else {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(data) //nolint:errcheck
byteData, err := json.Marshal(data)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
w.Write(byteData) //nolint:errcheck
}
}
}
Expand Down
Expand Up @@ -49,5 +49,11 @@ func ChallengeHandler(ctx context.Context, r *http.Request) (interface{}, error,

res, err := challengeHandler(ctx, r)

if state.NotifyOnValidationTicketGeneration {
conductrpc.Client().ValidatorTicket(conductrpc.ValidtorTicket{
ValidatorId: node.Self.ID,
})
}

return res, err, true
}
2 changes: 1 addition & 1 deletion docker.local/conductor-config/0chain_blobber.yaml
@@ -1,7 +1,7 @@
version: "1.0"

logging:
level: "info"
level: "debug"
console: true # printing log to console is only supported in development mode

info:
Expand Down
2 changes: 1 addition & 1 deletion docker.local/conductor-config/0chain_validator.yaml
Expand Up @@ -17,7 +17,7 @@ rate_limiters:
proxy: true

logging:
level: "error"
level: "debug"
console: true # printing log to console is only supported in development mode

healthcheck:
Expand Down

0 comments on commit 85646cd

Please sign in to comment.